/*-------------------------------------------------------------------------
 *
 * tuplesort.c
 *      Generalized tuple sorting routines.
 *
 * This module handles sorting of heap tuples, index tuples, or single
 * Datums (and could easily support other kinds of sortable objects,
 * if necessary).  It works efficiently for both small and large amounts
 * of data.  Small amounts are sorted in-memory using qsort().  Large
 * amounts are sorted using temporary files and a standard external sort
 * algorithm.
 *
 * See Knuth, volume 3, for more than you want to know about the external
 * sorting algorithm.  Historically, we divided the input into sorted runs
 * using replacement selection, in the form of a priority tree implemented
 * as a heap (essentially his Algorithm 5.2.3H), but now we only do that
 * for the first run, and only if the run would otherwise end up being very
 * short.  We merge the runs using polyphase merge, Knuth's Algorithm
 * 5.4.2D.  The logical "tapes" used by Algorithm D are implemented by
 * logtape.c, which avoids space wastage by recycling disk space as soon
 * as each block is read from its "tape".
 *
 * We do not use Knuth's recommended data structure (Algorithm 5.4.1R) for
 * the replacement selection, because it uses a fixed number of records
 * in memory at all times.  Since we are dealing with tuples that may vary
 * considerably in size, we want to be able to vary the number of records
 * kept in memory to ensure full utilization of the allowed sort memory
 * space.  So, we keep the tuples in a variable-size heap, with the next
 * record to go out at the top of the heap.  Like Algorithm 5.4.1R, each
 * record is stored with the run number that it must go into, and we use
 * (run number, key) as the ordering key for the heap.  When the run number
 * at the top of the heap changes, we know that no more records of the prior
 * run are left in the heap.  Note that there are in practice only ever two
 * distinct run numbers, because since PostgreSQL 9.6, we only use
 * replacement selection to form the first run.
 *
 * In PostgreSQL 9.6, a heap (based on Knuth's Algorithm H, with some small
 * customizations) is only used with the aim of producing just one run,
 * thereby avoiding all merging.  Only the first run can use replacement
 * selection, which is why there are now only two possible valid run
 * numbers, and why heapification is customized to not distinguish between
 * tuples in the second run (those will be quicksorted).  We generally
 * prefer a simple hybrid sort-merge strategy, where runs are sorted in much
 * the same way as the entire input of an internal sort is sorted (using
 * qsort()).  The replacement_sort_tuples GUC controls the limited remaining
 * use of replacement selection for the first run.
 *
 * There are several reasons to favor a hybrid sort-merge strategy.
 * Maintaining a priority tree/heap has poor CPU cache characteristics.
 * Furthermore, the growth in main memory sizes has greatly diminished the
 * value of having runs that are larger than available memory, even in the
 * case where there is partially sorted input and runs can be made far
 * larger by using a heap.  In most cases, a single-pass merge step is all
 * that is required even when runs are no larger than available memory.
 * Avoiding multiple merge passes was traditionally considered to be the
 * major advantage of using replacement selection.
 *
 * The approximate amount of memory allowed for any one sort operation
 * is specified in kilobytes by the caller (most pass work_mem).  Initially,
 * we absorb tuples and simply store them in an unsorted array as long as
 * we haven't exceeded workMem.  If we reach the end of the input without
 * exceeding workMem, we sort the array using qsort() and subsequently return
 * tuples just by scanning the tuple array sequentially.  If we do exceed
 * workMem, we begin to emit tuples into sorted runs in temporary tapes.
 * When tuples are dumped in batch after quicksorting, we begin a new run
 * with a new output tape (selected per Algorithm D).  After the end of the
 * input is reached, we dump out remaining tuples in memory into a final run
 * (or two, when replacement selection is still used), then merge the runs
 * using Algorithm D.
 *
 * When merging runs, we use a heap containing just the frontmost tuple from
 * each source run; we repeatedly output the smallest tuple and replace it
 * with the next tuple from its source tape (if any).  When the heap empties,
 * the merge is complete.  The basic merge algorithm thus needs very little
 * memory --- only M tuples for an M-way merge, and M is constrained to a
 * small number.  However, we can still make good use of our full workMem
 * allocation by pre-reading additional blocks from each source tape.  Without
 * prereading, our access pattern to the temporary file would be very erratic;
 * on average we'd read one block from each of M source tapes during the same
 * time that we're writing M blocks to the output tape, so there is no
 * sequentiality of access at all, defeating the read-ahead methods used by
 * most Unix kernels.  Worse, the output tape gets written into a very random
 * sequence of blocks of the temp file, ensuring that things will be even
 * worse when it comes time to read that tape.  A straightforward merge pass
 * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
 * by prereading from each source tape sequentially, loading about workMem/M
 * bytes from each tape in turn, and making the sequential blocks immediately
 * available for reuse.  This approach helps to localize both read and write
 * accesses.  The pre-reading is handled by logtape.c, we just tell it how
 * much memory to use for the buffers.
 *
 * When the caller requests random access to the sort result, we form
 * the final sorted run on a logical tape which is then "frozen", so
 * that we can access it randomly.  When the caller does not need random
 * access, we return from tuplesort_performsort() as soon as we are down
 * to one run per logical tape.  The final merge is then performed
 * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
 * saves one cycle of writing all the data out to disk and reading it in.
 *
 * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
 * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
 * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
 * tape drives are expensive beasts, and in particular that there will always
 * be many more runs than tape drives.  In our implementation a "tape drive"
 * doesn't cost much more than a few Kb of memory buffers, so we can afford
 * to have lots of them.  In particular, if we can have as many tape drives
 * as sorted runs, we can eliminate any repeated I/O at all.  In the current
 * code we determine the number of tapes M on the basis of workMem: we want
 * workMem/M to be large enough that we read a fair amount of data each time
 * we preread from a tape, so as to maintain the locality of access described
 * above.  Nonetheless, with large workMem we can have many tapes (but not
 * too many -- see the comments in tuplesort_merge_order).
 *
 *
 * Portions Copyright (c) 2012-2014, TransLattice, Inc.
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * This source code file contains modifications made by THL A29 Limited ("Tencent Modifications").
 * All Tencent Modifications are Copyright (C) 2023 THL A29 Limited.
 *
 * IDENTIFICATION
 *      src/backend/utils/sort/tuplesort.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <limits.h>

#include "access/htup_details.h"
#include "access/nbtree.h"
#include "access/hash.h"
#include "catalog/index.h"
#include "catalog/pg_am.h"
#include "commands/tablespace.h"
#include "executor/executor.h"
#include "miscadmin.h"
#include "pg_trace.h"
#ifdef PGXC
#include "pgxc/execRemote.h"
#include "catalog/pgxc_node.h"
#endif
#include "utils/datum.h"
#include "utils/logtape.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_rusage.h"
#include "utils/rel.h"
#include "utils/sortsupport.h"
#include "utils/tuplesort.h"


/* sort-type codes for sort__start probes */
#define HEAP_SORT        0
#define INDEX_SORT        1
#define DATUM_SORT        2
#define CLUSTER_SORT    3
#ifdef PGXC
#define MERGE_SORT 4
#endif

/* GUC variables */
#ifdef TRACE_SORT
bool        trace_sort = false;
#endif

#ifdef DEBUG_BOUNDED_SORT
bool        optimize_bounded_sort = true;
#endif


/*
 * The objects we actually sort are SortTuple structs.  These contain
 * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
 * which is a separate palloc chunk --- we assume it is just one chunk and
 * can be freed by a simple pfree() (except during merge, when we use a
 * simple slab allocator).  SortTuples also contain the tuple's first key
 * column in Datum/nullflag format, and an index integer.
 *
 * Storing the first key column lets us save heap_getattr or index_getattr
 * calls during tuple comparisons.  We could extract and save all the key
 * columns not just the first, but this would increase code complexity and
 * overhead, and wouldn't actually save any comparison cycles in the common
 * case where the first key determines the comparison result.  Note that
 * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
 *
 * There is one special case: when the sort support infrastructure provides an
 * "abbreviated key" representation, where the key is (typically) a pass by
 * value proxy for a pass by reference type.  In this case, the abbreviated key
 * is stored in datum1 in place of the actual first key column.
 *
 * When sorting single Datums, the data value is represented directly by
 * datum1/isnull1 for pass by value types (or null values).  If the datatype is
 * pass-by-reference and isnull1 is false, then "tuple" points to a separately
 * palloc'd data value, otherwise "tuple" is NULL.  The value of datum1 is then
 * either the same pointer as "tuple", or is an abbreviated key value as
 * described above.  Accordingly, "tuple" is always used in preference to
 * datum1 as the authoritative value for pass-by-reference cases.
 *
 * While building initial runs, tupindex holds the tuple's run number.
 * Historically, the run number could meaningfully distinguish many runs, but
 * it now only distinguishes RUN_FIRST and HEAP_RUN_NEXT, since replacement
 * selection is always abandoned after the first run; no other run number
 * should be represented here.  During merge passes, we re-use it to hold the
 * input tape number that each tuple in the heap was read from.  tupindex goes
 * unused if the sort occurs entirely in memory.
 */
typedef struct
{
    void       *tuple;            /* the tuple itself */
    Datum        datum1;            /* value of first key column */
    bool        isnull1;        /* is first key column NULL? */
    int            tupindex;        /* see notes above */
} SortTuple;

/*
 * During merge, we use a pre-allocated set of fixed-size slots to hold
 * tuples.  To avoid palloc/pfree overhead.
 *
 * Merge doesn't require a lot of memory, so we can afford to waste some,
 * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
 * palloc() overhead is not significant anymore.
 *
 * 'nextfree' is valid when this chunk is in the free list.  When in use, the
 * slot holds a tuple.
 */
#define SLAB_SLOT_SIZE 1024

typedef union SlabSlot
{
    union SlabSlot *nextfree;
    char        buffer[SLAB_SLOT_SIZE];
} SlabSlot;

/*
 * Possible states of a Tuplesort object.  These denote the states that
 * persist between calls of Tuplesort routines.
 */
typedef enum
{
    TSS_INITIAL,                /* Loading tuples; still within memory limit */
    TSS_BOUNDED,                /* Loading tuples into bounded-size heap */
    TSS_BUILDRUNS,                /* Loading tuples; writing to tape */
    TSS_SORTEDINMEM,            /* Sort completed entirely in memory */
    TSS_SORTEDONTAPE,            /* Sort completed, final run is on tape */
    TSS_FINALMERGE                /* Performing final merge on-the-fly */
} TupSortStatus;

/*
 * Parameters for calculation of number of tapes to use --- see inittapes()
 * and tuplesort_merge_order().
 *
 * In this calculation we assume that each tape will cost us about 1 blocks
 * worth of buffer space.  This ignores the overhead of all the other data
 * structures needed for each tape, but it's probably close enough.
 *
 * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
 * tape during a preread cycle (see discussion at top of file).
 */
#define MINORDER        6        /* minimum merge order */
#define MAXORDER        500        /* maximum merge order */
#define TAPE_BUFFER_OVERHEAD        BLCKSZ
#define MERGE_BUFFER_SIZE            (BLCKSZ * 32)

 /*
  * Run numbers, used during external sort operations.
  *
  * HEAP_RUN_NEXT is only used for SortTuple.tupindex, never state.currentRun.
  */
#define RUN_FIRST        0
#define HEAP_RUN_NEXT    INT_MAX
#define RUN_SECOND        1

typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
                                    Tuplesortstate *state);

/*
 * Private state of a Tuplesort operation.
 */
struct Tuplesortstate
{
    TupSortStatus status;        /* enumerated value as shown above */
    int            nKeys;            /* number of columns in sort key */
    bool        randomAccess;    /* did caller request random access? */
    bool        bounded;        /* did caller specify a maximum number of
                                 * tuples to return? */
    bool        boundUsed;        /* true if we made use of a bounded heap */
    int            bound;            /* if bounded, the maximum number of tuples */
    bool        tuples;            /* Can SortTuple.tuple ever be set? */
    int64        availMem;        /* remaining memory available, in bytes */
    int64        allowedMem;        /* total memory allowed, in bytes */
    int            maxTapes;        /* number of tapes (Knuth's T) */
    int            tapeRange;        /* maxTapes-1 (Knuth's P) */
    MemoryContext sortcontext;    /* memory context holding most sort data */
    MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
    LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
#ifdef PGXC
    ResponseCombiner *combiner; /* tuple source, alternate to tapeset */
#endif /* PGXC */

    /*
     * These function pointers decouple the routines that must know what kind
     * of tuple we are sorting from the routines that don't need to know it.
     * They are set up by the tuplesort_begin_xxx routines.
     *
     * Function to compare two tuples; result is per qsort() convention, ie:
     * <0, 0, >0 according as a<b, a=b, a>b.  The API must match
     * qsort_arg_comparator.
     */
    SortTupleComparator comparetup;

    /*
     * Function to copy a supplied input tuple into palloc'd space and set up
     * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
     * state->availMem must be decreased by the amount of space used for the
     * tuple copy (note the SortTuple struct itself is not counted).
     */
    void        (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);

    /*
     * Function to write a stored tuple onto tape.  The representation of the
     * tuple on tape need not be the same as it is in memory; requirements on
     * the tape representation are given below.  Unless the slab allocator is
     * used, after writing the tuple, pfree() the out-of-line data (not the
     * SortTuple struct!), and increase state->availMem by the amount of
     * memory space thereby released.
     */
    void        (*writetup) (Tuplesortstate *state, int tapenum,
                             SortTuple *stup);

    /*
     * Function to read a stored tuple from tape back into memory. 'len' is
     * the already-read length of the stored tuple.  The tuple is allocated
     * from the slab memory arena, or is palloc'd, see readtup_alloc().
     */
    void        (*readtup) (Tuplesortstate *state, SortTuple *stup,
                            int tapenum, unsigned int len);

#ifdef PGXC
    /*
     * Function to read length of next stored tuple.
     * Used as 'len' parameter for readtup function.
     */
    unsigned int (*getlen) (Tuplesortstate *state, int tapenum, bool eofOK);
#endif

    /*
     * This array holds the tuples now in sort memory.  If we are in state
     * INITIAL, the tuples are in no particular order; if we are in state
     * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
     * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
     * H.  In state SORTEDONTAPE, the array is not used.
     */
    SortTuple  *memtuples;        /* array of SortTuple structs */
    int            memtupcount;    /* number of tuples currently present */
    int            memtupsize;        /* allocated length of memtuples array */
    bool        growmemtuples;    /* memtuples' growth still underway? */

    /*
     * Memory for tuples is sometimes allocated using a simple slab allocator,
     * rather than with palloc().  Currently, we switch to slab allocation
     * when we start merging.  Merging only needs to keep a small, fixed
     * number of tuples in memory at any time, so we can avoid the
     * palloc/pfree overhead by recycling a fixed number of fixed-size slots
     * to hold the tuples.
     *
     * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
     * slots.  The allocation is sized to have one slot per tape, plus one
     * additional slot.  We need that many slots to hold all the tuples kept
     * in the heap during merge, plus the one we have last returned from the
     * sort, with tuplesort_gettuple.
     *
     * Initially, all the slots are kept in a linked list of free slots.  When
     * a tuple is read from a tape, it is put to the next available slot, if
     * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
     * instead.
     *
     * When we're done processing a tuple, we return the slot back to the free
     * list, or pfree() if it was palloc'd.  We know that a tuple was
     * allocated from the slab, if its pointer value is between
     * slabMemoryBegin and -End.
     *
     * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
     * tracking memory usage is not used.
     */
    bool        slabAllocatorUsed;

    char       *slabMemoryBegin;    /* beginning of slab memory arena */
    char       *slabMemoryEnd;    /* end of slab memory arena */
    SlabSlot   *slabFreeHead;    /* head of free list */

    /* Buffer size to use for reading input tapes, during merge. */
    size_t        read_buffer_size;

    /*
     * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
     * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
     * modes), we remember the tuple in 'lastReturnedTuple', so that we can
     * recycle the memory on next gettuple call.
     */
    void       *lastReturnedTuple;

    /*
     * While building initial runs, this indicates if the replacement
     * selection strategy is in use.  When it isn't, then a simple hybrid
     * sort-merge strategy is in use instead (runs are quicksorted).
     */
    bool        replaceActive;

    /*
     * While building initial runs, this is the current output run number
     * (starting at RUN_FIRST).  Afterwards, it is the number of initial runs
     * we made.
     */
    int            currentRun;

    /*
     * Unless otherwise noted, all pointer variables below are pointers to
     * arrays of length maxTapes, holding per-tape data.
     */

    /*
     * This variable is only used during merge passes.  mergeactive[i] is true
     * if we are reading an input run from (actual) tape number i and have not
     * yet exhausted that run.
     */
    bool       *mergeactive;    /* active input run source? */

    /*
     * Variables for Algorithm D.  Note that destTape is a "logical" tape
     * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
     * "logical" and "actual" tape numbers straight!
     */
    int            Level;            /* Knuth's l */
    int            destTape;        /* current output tape (Knuth's j, less 1) */
    int           *tp_fib;            /* Target Fibonacci run counts (A[]) */
    int           *tp_runs;        /* # of real runs on each tape */
    int           *tp_dummy;        /* # of dummy runs for each tape (D[]) */
    int           *tp_tapenum;        /* Actual tape numbers (TAPE[]) */
    int            activeTapes;    /* # of active input tapes in merge pass */

    /*
     * These variables are used after completion of sorting to keep track of
     * the next tuple to return.  (In the tape case, the tape's current read
     * position is also critical state.)
     */
    int            result_tape;    /* actual tape number of finished output */
    int            current;        /* array index (only used if SORTEDINMEM) */
    bool        eof_reached;    /* reached EOF (needed for cursors) */

    /* markpos_xxx holds marked position for mark and restore */
    long        markpos_block;    /* tape block# (only used if SORTEDONTAPE) */
    int            markpos_offset; /* saved "current", or offset in tape block */
    bool        markpos_eof;    /* saved "eof_reached" */

    /*
     * The sortKeys variable is used by every case other than the hash index
     * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
     * MinimalTuple and CLUSTER routines, though.
     */
    TupleDesc    tupDesc;
    SortSupport sortKeys;        /* array of length nKeys */

    /*
     * This variable is shared by the single-key MinimalTuple case and the
     * Datum case (which both use qsort_ssup()).  Otherwise it's NULL.
     */
    SortSupport onlyKey;

    /*
     * Additional state for managing "abbreviated key" sortsupport routines
     * (which currently may be used by all cases except the hash index case).
     * Tracks the intervals at which the optimization's effectiveness is
     * tested.
     */
    int64        abbrevNext;        /* Tuple # at which to next check
                                 * applicability */

    /*
     * These variables are specific to the CLUSTER case; they are set by
     * tuplesort_begin_cluster.
     */
    IndexInfo  *indexInfo;        /* info about index being used for reference */
    EState       *estate;            /* for evaluating index expressions */

    /*
     * These variables are specific to the IndexTuple case; they are set by
     * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
     */
    Relation    heapRel;        /* table the index is being built on */
    Relation    indexRel;        /* index being built */

    /* These are specific to the index_btree subcase: */
    bool        enforceUnique;    /* complain if we find duplicate tuples */

    /* These are specific to the index_hash subcase: */
    uint32        high_mask;        /* masks for sortable part of hash code */
    uint32        low_mask;
    uint32        max_buckets;

    /*
     * These variables are specific to the Datum case; they are set by
     * tuplesort_begin_datum and used only by the DatumTuple routines.
     */
    Oid            datumType;
    /* we need typelen in order to know how to copy the Datums. */
    int            datumTypeLen;

    /*
     * Resource snapshot for time of sort start.
     */
#ifdef TRACE_SORT
    PGRUsage    ru_start;
#endif
};

/*
 * Is the given tuple allocated from the slab memory arena?
 */
#define IS_SLAB_SLOT(state, tuple) \
    ((char *) (tuple) >= (state)->slabMemoryBegin && \
     (char *) (tuple) < (state)->slabMemoryEnd)

/*
 * Return the given tuple to the slab memory free list, or free it
 * if it was palloc'd.
 */
#define RELEASE_SLAB_SLOT(state, tuple) \
    do { \
        SlabSlot *buf = (SlabSlot *) tuple; \
        \
        if (IS_SLAB_SLOT((state), buf)) \
        { \
            buf->nextfree = (state)->slabFreeHead; \
            (state)->slabFreeHead = buf; \
        } else \
            pfree(buf); \
    } while(0)

#define COMPARETUP(state,a,b)    ((*(state)->comparetup) (a, b, state))
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup)    ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
#ifdef PGXC
#define GETLEN(state,tape,eofOK) ((*(state)->getlen) (state, tape, eofOK))
#endif
#define LACKMEM(state)        ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
#define USEMEM(state,amt)    ((state)->availMem -= (amt))
#define FREEMEM(state,amt)    ((state)->availMem += (amt))

/*
 * NOTES about on-tape representation of tuples:
 *
 * We require the first "unsigned int" of a stored tuple to be the total size
 * on-tape of the tuple, including itself (so it is never zero; an all-zero
 * unsigned int is used to delimit runs).  The remainder of the stored tuple
 * may or may not match the in-memory representation of the tuple ---
 * any conversion needed is the job of the writetup and readtup routines.
 *
 * If state->randomAccess is true, then the stored representation of the
 * tuple must be followed by another "unsigned int" that is a copy of the
 * length --- so the total tape space used is actually sizeof(unsigned int)
 * more than the stored length value.  This allows read-backwards.  When
 * randomAccess is not true, the write/read routines may omit the extra
 * length word.
 *
 * writetup is expected to write both length words as well as the tuple
 * data.  When readtup is called, the tape is positioned just after the
 * front length word; readtup must read the tuple data and advance past
 * the back length word (if present).
 *
 * The write/read routines can make use of the tuple description data
 * stored in the Tuplesortstate record, if needed.  They are also expected
 * to adjust state->availMem by the amount of memory space (not tape space!)
 * released or consumed.  There is no error return from either writetup
 * or readtup; they should ereport() on failure.
 *
 *
 * NOTES about memory consumption calculations:
 *
 * We count space allocated for tuples against the workMem limit, plus
 * the space used by the variable-size memtuples array.  Fixed-size space
 * is not counted; it's small enough to not be interesting.
 *
 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
 * rather than the originally-requested size.  This is important since
 * palloc can add substantial overhead.  It's not a complete answer since
 * we won't count any wasted space in palloc allocation blocks, but it's
 * a lot better than what we were doing before 7.3.  As of 9.6, a
 * separate memory context is used for caller passed tuples.  Resetting
 * it at certain key increments significantly ameliorates fragmentation.
 * Note that this places a responsibility on readtup and copytup routines
 * to use the right memory context for these tuples (and to not use the
 * reset context for anything whose lifetime needs to span multiple
 * external sort runs).
 */

/* When using this macro, beware of double evaluation of len */
#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
    do { \
        if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
            elog(ERROR, "unexpected end of data"); \
    } while(0)


static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
static bool consider_abort_common(Tuplesortstate *state);
static bool useselection(Tuplesortstate *state);
static void inittapes(Tuplesortstate *state);
static void selectnewtape(Tuplesortstate *state);
static void init_slab_allocator(Tuplesortstate *state, int numSlots);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void dumpbatch(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
static void sort_bounded_heap(Tuplesortstate *state);
static void tuplesort_sort_memtuples(Tuplesortstate *state);
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
                      bool checkIndex);
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple,
                           bool checkIndex);
static void tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex);
static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum);
static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
                Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
static void writetup_heap(Tuplesortstate *state, int tapenum,
              SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
             int tapenum, unsigned int len);
#ifdef PGXC
static unsigned int getlen_datanode(Tuplesortstate *state, int tapenum,
                bool eofOK);
static void readtup_datanode(Tuplesortstate *state, SortTuple *stup,
                 int tapenum, unsigned int len);
#endif
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
                   Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
static void writetup_cluster(Tuplesortstate *state, int tapenum,
                 SortTuple *stup);
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
                int tapenum, unsigned int len);
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                       Tuplesortstate *state);
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
                      Tuplesortstate *state);
static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
static void writetup_index(Tuplesortstate *state, int tapenum,
               SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
              int tapenum, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
                 Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
static void writetup_datum(Tuplesortstate *state, int tapenum,
               SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
              int tapenum, unsigned int len);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);

/*
 * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
 * any variant of SortTuples, using the appropriate comparetup function.
 * qsort_ssup() is specialized for the case where the comparetup function
 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
 * and Datum sorts.
 */
#include "qsort_tuple.c"


/*
 *        tuplesort_begin_xxx
 *
 * Initialize for a tuple sort operation.
 *
 * After calling tuplesort_begin, the caller should call tuplesort_putXXX
 * zero or more times, then call tuplesort_performsort when all the tuples
 * have been supplied.  After performsort, retrieve the tuples in sorted
 * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
 * access was requested, rescan, markpos, and restorepos can also be called.)
 * Call tuplesort_end to terminate the operation and release memory/disk space.
 *
 * Each variant of tuplesort_begin has a workMem parameter specifying the
 * maximum number of kilobytes of RAM to use before spilling data to disk.
 * (The normal value of this parameter is work_mem, but some callers use
 * other values.)  Each variant also has a randomAccess parameter specifying
 * whether the caller needs non-sequential access to the sort result.
 */

static Tuplesortstate *
tuplesort_begin_common(int workMem, bool randomAccess)
{
    Tuplesortstate *state;
    MemoryContext sortcontext;
    MemoryContext tuplecontext;
    MemoryContext oldcontext;

    /*
     * Create a working memory context for this sort operation. All data
     * needed by the sort will live inside this context.
     */
    sortcontext = AllocSetContextCreate(CurrentMemoryContext,
                                        "TupleSort main",
                                        ALLOCSET_DEFAULT_SIZES);

    /*
     * Caller tuple (e.g. IndexTuple) memory context.
     *
     * A dedicated child context used exclusively for caller passed tuples
     * eases memory management.  Resetting at key points reduces
     * fragmentation. Note that the memtuples array of SortTuples is allocated
     * in the parent context, not this context, because there is no need to
     * free memtuples early.
     */
    tuplecontext = AllocSetContextCreate(sortcontext,
                                         "Caller tuples",
                                         ALLOCSET_DEFAULT_SIZES);

    /*
     * Make the Tuplesortstate within the per-sort context.  This way, we
     * don't need a separate pfree() operation for it at shutdown.
     */
    oldcontext = MemoryContextSwitchTo(sortcontext);

    state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));

#ifdef TRACE_SORT
    if (trace_sort)
        pg_rusage_init(&state->ru_start);
#endif

    state->status = TSS_INITIAL;
    state->randomAccess = randomAccess;
    state->bounded = false;
    state->tuples = true;
    state->boundUsed = false;
    state->allowedMem = workMem * (int64) 1024;
    state->availMem = state->allowedMem;
    state->sortcontext = sortcontext;
    state->tuplecontext = tuplecontext;
    state->tapeset = NULL;

    state->memtupcount = 0;

    /*
     * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     * see comments in grow_memtuples().
     */
    state->memtupsize = Max(1024,
                            ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);

    state->growmemtuples = true;
    state->slabAllocatorUsed = false;
    state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));

    USEMEM(state, GetMemoryChunkSpace(state->memtuples));

    /* workMem must be large enough for the minimal memtuples array */
    if (LACKMEM(state))
        elog(ERROR, "insufficient memory allowed for sort");

    state->currentRun = RUN_FIRST;

    /*
     * maxTapes, tapeRange, and Algorithm D variables will be initialized by
     * inittapes(), if needed
     */

    state->result_tape = -1;    /* flag that result tape has not been formed */

    MemoryContextSwitchTo(oldcontext);

    return state;
}

Tuplesortstate *
tuplesort_begin_heap(TupleDesc tupDesc,
                     int nkeys, AttrNumber *attNums,
                     Oid *sortOperators, Oid *sortCollations,
                     bool *nullsFirstFlags,
                     int workMem, bool randomAccess)
{
    Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
    MemoryContext oldcontext;
    int            i;

    oldcontext = MemoryContextSwitchTo(state->sortcontext);

    AssertArg(nkeys > 0);

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG,
             "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
             nkeys, workMem, randomAccess ? 't' : 'f');
#endif

    state->nKeys = nkeys;

    TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
                                false,    /* no unique check */
                                nkeys,
                                workMem,
                                randomAccess);

    state->comparetup = comparetup_heap;
    state->copytup = copytup_heap;
    state->writetup = writetup_heap;
    state->readtup = readtup_heap;
#ifdef PGXC
    state->getlen = getlen;
#endif

    state->tupDesc = tupDesc;    /* assume we need not copy tupDesc */
    state->abbrevNext = 10;

    /* Prepare SortSupport data for each column */
    state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));

    for (i = 0; i < nkeys; i++)
    {
        SortSupport sortKey = state->sortKeys + i;

        AssertArg(attNums[i] != 0);
        AssertArg(sortOperators[i] != 0);

        sortKey->ssup_cxt = CurrentMemoryContext;
        sortKey->ssup_collation = sortCollations[i];
        sortKey->ssup_nulls_first = nullsFirstFlags[i];
        sortKey->ssup_attno = attNums[i];
        /* Convey if abbreviation optimization is applicable in principle */
        sortKey->abbreviate = (i == 0);

        PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
    }

    /*
     * The "onlyKey" optimization cannot be used with abbreviated keys, since
     * tie-breaker comparisons may be required.  Typically, the optimization
     * is only of value to pass-by-value types anyway, whereas abbreviated
     * keys are typically only of value to pass-by-reference types.
     */
    if (nkeys == 1 && !state->sortKeys->abbrev_converter)
        state->onlyKey = state->sortKeys;

    MemoryContextSwitchTo(oldcontext);

    return state;
}

Tuplesortstate *
tuplesort_begin_cluster(TupleDesc tupDesc,
                        Relation indexRel,
                        int workMem, bool randomAccess)
{
    Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
    ScanKey        indexScanKey;
    MemoryContext oldcontext;
    int            i;

    Assert(indexRel->rd_rel->relam == BTREE_AM_OID);

    oldcontext = MemoryContextSwitchTo(state->sortcontext);

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG,
             "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
             RelationGetNumberOfAttributes(indexRel),
             workMem, randomAccess ? 't' : 'f');
#endif

    state->nKeys = RelationGetNumberOfAttributes(indexRel);

    TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
                                false,    /* no unique check */
                                state->nKeys,
                                workMem,
                                randomAccess);

    state->comparetup = comparetup_cluster;
    state->copytup = copytup_cluster;
    state->writetup = writetup_cluster;
    state->readtup = readtup_cluster;
#ifdef PGXC
    state->getlen = getlen;
#endif
    state->abbrevNext = 10;

    state->indexInfo = BuildIndexInfo(indexRel);

    state->tupDesc = tupDesc;    /* assume we need not copy tupDesc */

    indexScanKey = _bt_mkscankey_nodata(indexRel);

    if (state->indexInfo->ii_Expressions != NULL)
    {
        TupleTableSlot *slot;
        ExprContext *econtext;

        /*
         * We will need to use FormIndexDatum to evaluate the index
         * expressions.  To do that, we need an EState, as well as a
         * TupleTableSlot to put the table tuples into.  The econtext's
         * scantuple has to point to that slot, too.
         */
        state->estate = CreateExecutorState();
        slot = MakeSingleTupleTableSlot(tupDesc);
        econtext = GetPerTupleExprContext(state->estate);
        econtext->ecxt_scantuple = slot;
    }

    /* Prepare SortSupport data for each column */
    state->sortKeys = (SortSupport) palloc0(state->nKeys *
                                            sizeof(SortSupportData));

    for (i = 0; i < state->nKeys; i++)
    {
        SortSupport sortKey = state->sortKeys + i;
        ScanKey        scanKey = indexScanKey + i;
        int16        strategy;

        sortKey->ssup_cxt = CurrentMemoryContext;
        sortKey->ssup_collation = scanKey->sk_collation;
        sortKey->ssup_nulls_first =
            (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
        sortKey->ssup_attno = scanKey->sk_attno;
        /* Convey if abbreviation optimization is applicable in principle */
        sortKey->abbreviate = (i == 0);

        AssertState(sortKey->ssup_attno != 0);

        strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
            BTGreaterStrategyNumber : BTLessStrategyNumber;

        PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
    }

    _bt_freeskey(indexScanKey);

    MemoryContextSwitchTo(oldcontext);

    return state;
}

Tuplesortstate *
tuplesort_begin_index_btree(Relation heapRel,
                            Relation indexRel,
                            bool enforceUnique,
                            int workMem, bool randomAccess)
{
    Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
    ScanKey        indexScanKey;
    MemoryContext oldcontext;
    int            i;

    oldcontext = MemoryContextSwitchTo(state->sortcontext);

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG,
             "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
             enforceUnique ? 't' : 'f',
             workMem, randomAccess ? 't' : 'f');
#endif

    state->nKeys = RelationGetNumberOfAttributes(indexRel);

    TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
                                enforceUnique,
                                state->nKeys,
                                workMem,
                                randomAccess);

    state->comparetup = comparetup_index_btree;
    state->copytup = copytup_index;
    state->writetup = writetup_index;
    state->readtup = readtup_index;
#ifdef PGXC
    state->getlen = getlen;
#endif
    state->abbrevNext = 10;

    state->heapRel = heapRel;
    state->indexRel = indexRel;
    state->enforceUnique = enforceUnique;

    indexScanKey = _bt_mkscankey_nodata(indexRel);
    state->nKeys = RelationGetNumberOfAttributes(indexRel);

    /* Prepare SortSupport data for each column */
    state->sortKeys = (SortSupport) palloc0(state->nKeys *
                                            sizeof(SortSupportData));

    for (i = 0; i < state->nKeys; i++)
    {
        SortSupport sortKey = state->sortKeys + i;
        ScanKey        scanKey = indexScanKey + i;
        int16        strategy;

        sortKey->ssup_cxt = CurrentMemoryContext;
        sortKey->ssup_collation = scanKey->sk_collation;
        sortKey->ssup_nulls_first =
            (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
        sortKey->ssup_attno = scanKey->sk_attno;
        /* Convey if abbreviation optimization is applicable in principle */
        sortKey->abbreviate = (i == 0);

        AssertState(sortKey->ssup_attno != 0);

        strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
            BTGreaterStrategyNumber : BTLessStrategyNumber;

        PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
    }

    _bt_freeskey(indexScanKey);

    MemoryContextSwitchTo(oldcontext);

    return state;
}

Tuplesortstate *
tuplesort_begin_index_hash(Relation heapRel,
                           Relation indexRel,
                           uint32 high_mask,
                           uint32 low_mask,
                           uint32 max_buckets,
                           int workMem, bool randomAccess)
{
    Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
    MemoryContext oldcontext;

    oldcontext = MemoryContextSwitchTo(state->sortcontext);

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG,
             "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
             "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
             high_mask,
             low_mask,
             max_buckets,
             workMem, randomAccess ? 't' : 'f');
#endif

    state->nKeys = 1;            /* Only one sort column, the hash code */

    state->comparetup = comparetup_index_hash;
    state->copytup = copytup_index;
    state->writetup = writetup_index;
    state->readtup = readtup_index;
#ifdef PGXC
    state->getlen = getlen;
#endif

    state->heapRel = heapRel;
    state->indexRel = indexRel;

    state->high_mask = high_mask;
    state->low_mask = low_mask;
    state->max_buckets = max_buckets;

    MemoryContextSwitchTo(oldcontext);

    return state;
}

Tuplesortstate *
tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
                      bool nullsFirstFlag,
                      int workMem, bool randomAccess)
{
    Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
    MemoryContext oldcontext;
    int16        typlen;
    bool        typbyval;

    oldcontext = MemoryContextSwitchTo(state->sortcontext);

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG,
             "begin datum sort: workMem = %d, randomAccess = %c",
             workMem, randomAccess ? 't' : 'f');
#endif

    state->nKeys = 1;            /* always a one-column sort */

    TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
                                false,    /* no unique check */
                                1,
                                workMem,
                                randomAccess);

    state->comparetup = comparetup_datum;
    state->copytup = copytup_datum;
    state->writetup = writetup_datum;
    state->readtup = readtup_datum;
#ifdef PGXC
    state->getlen = getlen;
#endif
    state->abbrevNext = 10;

    state->datumType = datumType;

    /* lookup necessary attributes of the datum type */
    get_typlenbyval(datumType, &typlen, &typbyval);
    state->datumTypeLen = typlen;
    state->tuples = !typbyval;

    /* Prepare SortSupport data */
    state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));

    state->sortKeys->ssup_cxt = CurrentMemoryContext;
    state->sortKeys->ssup_collation = sortCollation;
    state->sortKeys->ssup_nulls_first = nullsFirstFlag;

    /*
     * Abbreviation is possible here only for by-reference types.  In theory,
     * a pass-by-value datatype could have an abbreviated form that is cheaper
     * to compare.  In a tuple sort, we could support that, because we can
     * always extract the original datum from the tuple is needed.  Here, we
     * can't, because a datum sort only stores a single copy of the datum; the
     * "tuple" field of each sortTuple is NULL.
     */
    state->sortKeys->abbreviate = !typbyval;

    PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);

    /*
     * The "onlyKey" optimization cannot be used with abbreviated keys, since
     * tie-breaker comparisons may be required.  Typically, the optimization
     * is only of value to pass-by-value types anyway, whereas abbreviated
     * keys are typically only of value to pass-by-reference types.
     */
    if (!state->sortKeys->abbrev_converter)
        state->onlyKey = state->sortKeys;

    MemoryContextSwitchTo(oldcontext);

    return state;
}

#ifdef PGXC
/*
 * Tuples are coming from source where they are already sorted.
 * It is pretty much like sorting heap tuples but no need to load sorter.
 * Sorter initial status is final merge, and correct readtup and getlen
 * callbacks should be passed in.
 * Usage pattern of the merge sorter
 * tuplesort_begin_merge
 * while (tuple = tuplesort_gettuple())
 * {
 *     // process
 * }
 * tuplesort_end_merge
 */
Tuplesortstate *
tuplesort_begin_merge(TupleDesc tupDesc,
                     int nkeys, AttrNumber *attNums,
                     Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags,
                     ResponseCombiner *combiner,
                     int workMem)
{
    Tuplesortstate *state = tuplesort_begin_common(workMem, false);
    MemoryContext oldcontext;
    int            i;

    oldcontext = MemoryContextSwitchTo(state->sortcontext);

    AssertArg(nkeys > 0);
    AssertArg(combiner);

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG,
             "begin merge sort: nkeys = %d, workMem = %d", nkeys, workMem);
#endif

    state->nKeys = nkeys;

    TRACE_POSTGRESQL_SORT_START(MERGE_SORT,
                                false,    /* no unique check */
                                nkeys,
                                workMem,
                                false);

    state->combiner = combiner;
    state->comparetup = comparetup_heap;
    state->copytup = NULL;
    state->writetup = NULL;
    state->readtup = readtup_datanode;
    state->getlen = getlen_datanode;

    state->tuples = false;

    state->tupDesc = tupDesc;    /* assume we need not copy tupDesc */
    state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));

    for (i = 0; i < nkeys; i++)
    {
        SortSupport sortKey = state->sortKeys + i;

        AssertArg(attNums[i] != 0);
        AssertArg(sortOperators[i] != 0);

        sortKey->ssup_cxt = CurrentMemoryContext;
        sortKey->ssup_collation = sortCollations[i];
        sortKey->ssup_nulls_first = nullsFirstFlags[i];
        sortKey->ssup_attno = attNums[i];

        PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
    }

    /*
     * logical tape in this case is a sorted stream
     */
    state->maxTapes = combiner->conn_count;
    state->tapeRange = combiner->conn_count;

    state->mergeactive = (bool *) palloc0(combiner->conn_count * sizeof(bool));
    state->tp_runs = (int *) palloc0(combiner->conn_count * sizeof(int));
    state->tp_dummy = (int *) palloc0(combiner->conn_count * sizeof(int));
    state->tp_tapenum = (int *) palloc0(combiner->conn_count * sizeof(int));
    /* mark each stream (tape) has one run */
    for (i = 0; i < combiner->conn_count; i++)
    {
        state->tp_runs[i] = 1;
        state->tp_tapenum[i] = i;
    }

    init_slab_allocator(state, 0);

    beginmerge(state);
    state->status = TSS_FINALMERGE;

    MemoryContextSwitchTo(oldcontext);

    return state;
}
#endif

/*
 * tuplesort_set_bound
 *
 *    Advise tuplesort that at most the first N result tuples are required.
 *
 * Must be called before inserting any tuples.  (Actually, we could allow it
 * as long as the sort hasn't spilled to disk, but there seems no need for
 * delayed calls at the moment.)
 *
 * This is a hint only. The tuplesort may still return more tuples than
 * requested.
 */
void
tuplesort_set_bound(Tuplesortstate *state, int64 bound)
{
    /* Assert we're called before loading any tuples */
    Assert(state->status == TSS_INITIAL);
    Assert(state->memtupcount == 0);
    Assert(!state->bounded);

#ifdef DEBUG_BOUNDED_SORT
    /* Honor GUC setting that disables the feature (for easy testing) */
    if (!optimize_bounded_sort)
        return;
#endif

    /* We want to be able to compute bound * 2, so limit the setting */
    if (bound > (int64) (INT_MAX / 2))
        return;

    state->bounded = true;
    state->bound = (int) bound;

    /*
     * Bounded sorts are not an effective target for abbreviated key
     * optimization.  Disable by setting state to be consistent with no
     * abbreviation support.
     */
    state->sortKeys->abbrev_converter = NULL;
    if (state->sortKeys->abbrev_full_comparator)
        state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;

    /* Not strictly necessary, but be tidy */
    state->sortKeys->abbrev_abort = NULL;
    state->sortKeys->abbrev_full_comparator = NULL;
}

/*
 * tuplesort_end
 *
 *    Release resources and clean up.
 *
 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
 * pointing to garbage.  Be careful not to attempt to use or free such
 * pointers afterwards!
 */
void
tuplesort_end(Tuplesortstate *state)
{
    /* context swap probably not needed, but let's be safe */
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);

#ifdef TRACE_SORT
    long        spaceUsed;

    if (state->tapeset)
        spaceUsed = LogicalTapeSetBlocks(state->tapeset);
    else
        spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
#endif

    /*
     * Delete temporary "tape" files, if any.
     *
     * Note: want to include this in reported total cost of sort, hence need
     * for two #ifdef TRACE_SORT sections.
     */
    if (state->tapeset)
        LogicalTapeSetClose(state->tapeset);

#ifdef TRACE_SORT
    if (trace_sort)
    {
        if (state->tapeset)
            elog(LOG, "external sort ended, %ld disk blocks used: %s",
                 spaceUsed, pg_rusage_show(&state->ru_start));
        else
            elog(LOG, "internal sort ended, %ld KB used: %s",
                 spaceUsed, pg_rusage_show(&state->ru_start));
    }

    TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
#else

    /*
     * If you disabled TRACE_SORT, you can still probe sort__done, but you
     * ain't getting space-used stats.
     */
    TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
#endif

    /* Free any execution state created for CLUSTER case */
    if (state->estate != NULL)
    {
        ExprContext *econtext = GetPerTupleExprContext(state->estate);

        ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
        FreeExecutorState(state->estate);
    }

    MemoryContextSwitchTo(oldcontext);

    /*
     * Free the per-sort memory context, thereby releasing all working memory,
     * including the Tuplesortstate struct itself.
     */
    MemoryContextDelete(state->sortcontext);
}

/*
 * Grow the memtuples[] array, if possible within our memory constraint.  We
 * must not exceed INT_MAX tuples in memory or the caller-provided memory
 * limit.  Return TRUE if we were able to enlarge the array, FALSE if not.
 *
 * Normally, at each increment we double the size of the array.  When doing
 * that would exceed a limit, we attempt one last, smaller increase (and then
 * clear the growmemtuples flag so we don't try any more).  That allows us to
 * use memory as fully as permitted; sticking to the pure doubling rule could
 * result in almost half going unused.  Because availMem moves around with
 * tuple addition/removal, we need some rule to prevent making repeated small
 * increases in memtupsize, which would just be useless thrashing.  The
 * growmemtuples flag accomplishes that and also prevents useless
 * recalculations in this function.
 */
static bool
grow_memtuples(Tuplesortstate *state)
{// #lizard forgives
    int            newmemtupsize;
    int            memtupsize = state->memtupsize;
    int64        memNowUsed = state->allowedMem - state->availMem;

    /* Forget it if we've already maxed out memtuples, per comment above */
    if (!state->growmemtuples)
        return false;

    /* Select new value of memtupsize */
    if (memNowUsed <= state->availMem)
    {
        /*
         * We've used no more than half of allowedMem; double our usage,
         * clamping at INT_MAX tuples.
         */
        if (memtupsize < INT_MAX / 2)
            newmemtupsize = memtupsize * 2;
        else
        {
            newmemtupsize = INT_MAX;
            state->growmemtuples = false;
        }
    }
    else
    {
        /*
         * This will be the last increment of memtupsize.  Abandon doubling
         * strategy and instead increase as much as we safely can.
         *
         * To stay within allowedMem, we can't increase memtupsize by more
         * than availMem / sizeof(SortTuple) elements.  In practice, we want
         * to increase it by considerably less, because we need to leave some
         * space for the tuples to which the new array slots will refer.  We
         * assume the new tuples will be about the same size as the tuples
         * we've already seen, and thus we can extrapolate from the space
         * consumption so far to estimate an appropriate new size for the
         * memtuples array.  The optimal value might be higher or lower than
         * this estimate, but it's hard to know that in advance.  We again
         * clamp at INT_MAX tuples.
         *
         * This calculation is safe against enlarging the array so much that
         * LACKMEM becomes true, because the memory currently used includes
         * the present array; thus, there would be enough allowedMem for the
         * new array elements even if no other memory were currently used.
         *
         * We do the arithmetic in float8, because otherwise the product of
         * memtupsize and allowedMem could overflow.  Any inaccuracy in the
         * result should be insignificant; but even if we computed a
         * completely insane result, the checks below will prevent anything
         * really bad from happening.
         */
        double        grow_ratio;

        grow_ratio = (double) state->allowedMem / (double) memNowUsed;
        if (memtupsize * grow_ratio < INT_MAX)
            newmemtupsize = (int) (memtupsize * grow_ratio);
        else
            newmemtupsize = INT_MAX;

        /* We won't make any further enlargement attempts */
        state->growmemtuples = false;
    }

    /* Must enlarge array by at least one element, else report failure */
    if (newmemtupsize <= memtupsize)
        goto noalloc;

    /*
     * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
     * to ensure our request won't be rejected.  Note that we can easily
     * exhaust address space before facing this outcome.  (This is presently
     * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
     * don't rely on that at this distance.)
     */
    if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
    {
        newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
        state->growmemtuples = false;    /* can't grow any more */
    }

    /*
     * We need to be sure that we do not cause LACKMEM to become true, else
     * the space management algorithm will go nuts.  The code above should
     * never generate a dangerous request, but to be safe, check explicitly
     * that the array growth fits within availMem.  (We could still cause
     * LACKMEM if the memory chunk overhead associated with the memtuples
     * array were to increase.  That shouldn't happen because we chose the
     * initial array size large enough to ensure that palloc will be treating
     * both old and new arrays as separate chunks.  But we'll check LACKMEM
     * explicitly below just in case.)
     */
    if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
        goto noalloc;

    /* OK, do it */
    FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    state->memtupsize = newmemtupsize;
    state->memtuples = (SortTuple *)
        repalloc_huge(state->memtuples,
                      state->memtupsize * sizeof(SortTuple));
    USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    if (LACKMEM(state))
        elog(ERROR, "unexpected out-of-memory situation in tuplesort");
    return true;

noalloc:
    /* If for any reason we didn't realloc, shut off future attempts */
    state->growmemtuples = false;
    return false;
}

/*
 * Accept one tuple while collecting input data for sort.
 *
 * Note that the input data is always copied; the caller need not save it.
 */
void
tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
    SortTuple    stup;

    /*
     * Copy the given tuple into memory we control, and decrease availMem.
     * Then call the common code.
     */
    COPYTUP(state, &stup, (void *) slot);

    puttuple_common(state, &stup);

    MemoryContextSwitchTo(oldcontext);
}

/*
 * Accept one tuple while collecting input data for sort.
 *
 * Note that the input data is always copied; the caller need not save it.
 */
void
tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
    SortTuple    stup;

    /*
     * Copy the given tuple into memory we control, and decrease availMem.
     * Then call the common code.
     */
    COPYTUP(state, &stup, (void *) tup);

    puttuple_common(state, &stup);

    MemoryContextSwitchTo(oldcontext);
}

/*
 * Collect one index tuple while collecting input data for sort, building
 * it from caller-supplied values.
 */
void
tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
                              ItemPointer self, Datum *values,
                              bool *isnull)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
    SortTuple    stup;
    Datum        original;
    IndexTuple    tuple;

    stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
    tuple = ((IndexTuple) stup.tuple);
    tuple->t_tid = *self;
    USEMEM(state, GetMemoryChunkSpace(stup.tuple));
    /* set up first-column key value */
    original = index_getattr(tuple,
                             1,
                             RelationGetDescr(state->indexRel),
                             &stup.isnull1);

    MemoryContextSwitchTo(state->sortcontext);

    if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
    {
        /*
         * Store ordinary Datum representation, or NULL value.  If there is a
         * converter it won't expect NULL values, and cost model is not
         * required to account for NULL, so in that case we avoid calling
         * converter and just set datum1 to zeroed representation (to be
         * consistent, and to support cheap inequality tests for NULL
         * abbreviated keys).
         */
        stup.datum1 = original;
    }
    else if (!consider_abort_common(state))
    {
        /* Store abbreviated key representation */
        stup.datum1 = state->sortKeys->abbrev_converter(original,
                                                        state->sortKeys);
    }
    else
    {
        /* Abort abbreviation */
        int            i;

        stup.datum1 = original;

        /*
         * Set state to be consistent with never trying abbreviation.
         *
         * Alter datum1 representation in already-copied tuples, so as to
         * ensure a consistent representation (current tuple was just
         * handled).  It does not matter if some dumped tuples are already
         * sorted on tape, since serialized tuples lack abbreviated keys
         * (TSS_BUILDRUNS state prevents control reaching here in any case).
         */
        for (i = 0; i < state->memtupcount; i++)
        {
            SortTuple  *mtup = &state->memtuples[i];

            tuple = mtup->tuple;
            mtup->datum1 = index_getattr(tuple,
                                         1,
                                         RelationGetDescr(state->indexRel),
                                         &mtup->isnull1);
        }
    }

    puttuple_common(state, &stup);

    MemoryContextSwitchTo(oldcontext);
}

/*
 * Accept one Datum while collecting input data for sort.
 *
 * If the Datum is pass-by-ref type, the value will be copied.
 */
void
tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
    SortTuple    stup;

    /*
     * Pass-by-value types or null values are just stored directly in
     * stup.datum1 (and stup.tuple is not used and set to NULL).
     *
     * Non-null pass-by-reference values need to be copied into memory we
     * control, and possibly abbreviated. The copied value is pointed to by
     * stup.tuple and is treated as the canonical copy (e.g. to return via
     * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
     * abbreviated value if abbreviation is happening, otherwise it's
     * identical to stup.tuple.
     */

    if (isNull || !state->tuples)
    {
        /*
         * Set datum1 to zeroed representation for NULLs (to be consistent,
         * and to support cheap inequality tests for NULL abbreviated keys).
         */
        stup.datum1 = !isNull ? val : (Datum) 0;
        stup.isnull1 = isNull;
        stup.tuple = NULL;        /* no separate storage */
        MemoryContextSwitchTo(state->sortcontext);
    }
    else
    {
        Datum        original = datumCopy(val, false, state->datumTypeLen);

        stup.isnull1 = false;
        stup.tuple = DatumGetPointer(original);
        USEMEM(state, GetMemoryChunkSpace(stup.tuple));
        MemoryContextSwitchTo(state->sortcontext);

        if (!state->sortKeys->abbrev_converter)
        {
            stup.datum1 = original;
        }
        else if (!consider_abort_common(state))
        {
            /* Store abbreviated key representation */
            stup.datum1 = state->sortKeys->abbrev_converter(original,
                                                            state->sortKeys);
        }
        else
        {
            /* Abort abbreviation */
            int            i;

            stup.datum1 = original;

            /*
             * Set state to be consistent with never trying abbreviation.
             *
             * Alter datum1 representation in already-copied tuples, so as to
             * ensure a consistent representation (current tuple was just
             * handled).  It does not matter if some dumped tuples are already
             * sorted on tape, since serialized tuples lack abbreviated keys
             * (TSS_BUILDRUNS state prevents control reaching here in any
             * case).
             */
            for (i = 0; i < state->memtupcount; i++)
            {
                SortTuple  *mtup = &state->memtuples[i];

                mtup->datum1 = PointerGetDatum(mtup->tuple);
            }
        }
    }

    puttuple_common(state, &stup);

    MemoryContextSwitchTo(oldcontext);
}

/*
 * Shared code for tuple and datum cases.
 */
static void
puttuple_common(Tuplesortstate *state, SortTuple *tuple)
{// #lizard forgives
    switch (state->status)
    {
        case TSS_INITIAL:

            /*
             * Save the tuple into the unsorted array.  First, grow the array
             * as needed.  Note that we try to grow the array when there is
             * still one free slot remaining --- if we fail, there'll still be
             * room to store the incoming tuple, and then we'll switch to
             * tape-based operation.
             */
            if (state->memtupcount >= state->memtupsize - 1)
            {
                (void) grow_memtuples(state);
                Assert(state->memtupcount < state->memtupsize);
            }
            state->memtuples[state->memtupcount++] = *tuple;

            /*
             * Check if it's time to switch over to a bounded heapsort. We do
             * so if the input tuple count exceeds twice the desired tuple
             * count (this is a heuristic for where heapsort becomes cheaper
             * than a quicksort), or if we've just filled workMem and have
             * enough tuples to meet the bound.
             *
             * Note that once we enter TSS_BOUNDED state we will always try to
             * complete the sort that way.  In the worst case, if later input
             * tuples are larger than earlier ones, this might cause us to
             * exceed workMem significantly.
             */
            if (state->bounded &&
                (state->memtupcount > state->bound * 2 ||
                 (state->memtupcount > state->bound && LACKMEM(state))))
            {
#ifdef TRACE_SORT
                if (trace_sort)
                    elog(LOG, "switching to bounded heapsort at %d tuples: %s",
                         state->memtupcount,
                         pg_rusage_show(&state->ru_start));
#endif
                make_bounded_heap(state);
                return;
            }

            /*
             * Done if we still fit in available memory and have array slots.
             */
            if (state->memtupcount < state->memtupsize && !LACKMEM(state))
                return;

            /*
             * Nope; time to switch to tape-based operation.
             */
            inittapes(state);

            /*
             * Dump tuples until we are back under the limit.
             */
            dumptuples(state, false);
            break;

        case TSS_BOUNDED:

            /*
             * We don't want to grow the array here, so check whether the new
             * tuple can be discarded before putting it in.  This should be a
             * good speed optimization, too, since when there are many more
             * input tuples than the bound, most input tuples can be discarded
             * with just this one comparison.  Note that because we currently
             * have the sort direction reversed, we must check for <= not >=.
             */
            if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
            {
                /* new tuple <= top of the heap, so we can discard it */
                free_sort_tuple(state, tuple);
                CHECK_FOR_INTERRUPTS();
            }
            else
            {
                /* discard top of heap, replacing it with the new tuple */
                free_sort_tuple(state, &state->memtuples[0]);
                tuple->tupindex = 0;    /* not used */
                tuplesort_heap_replace_top(state, tuple, false);
            }
            break;

        case TSS_BUILDRUNS:

            /*
             * Insert the tuple into the heap, with run number currentRun if
             * it can go into the current run, else HEAP_RUN_NEXT.  The tuple
             * can go into the current run if it is >= the first
             * not-yet-output tuple.  (Actually, it could go into the current
             * run if it is >= the most recently output tuple ... but that
             * would require keeping around the tuple we last output, and it's
             * simplest to let writetup free each tuple as soon as it's
             * written.)
             *
             * Note that this only applies when:
             *
             * - currentRun is RUN_FIRST
             *
             * - Replacement selection is in use (typically it is never used).
             *
             * When these two conditions are not both true, all tuples are
             * appended indifferently, much like the TSS_INITIAL case.
             *
             * There should always be room to store the incoming tuple.
             */
            Assert(!state->replaceActive || state->memtupcount > 0);
            if (state->replaceActive &&
                COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
            {
                Assert(state->currentRun == RUN_FIRST);

                /*
                 * Insert tuple into first, fully heapified run.
                 *
                 * Unlike classic replacement selection, which this module was
                 * previously based on, only RUN_FIRST tuples are fully
                 * heapified.  Any second/next run tuples are appended
                 * indifferently.  While HEAP_RUN_NEXT tuples may be sifted
                 * out of the way of first run tuples, COMPARETUP() will never
                 * be called for the run's tuples during sifting (only our
                 * initial COMPARETUP() call is required for the tuple, to
                 * determine that the tuple does not belong in RUN_FIRST).
                 */
                tuple->tupindex = state->currentRun;
                tuplesort_heap_insert(state, tuple, true);
            }
            else
            {
                /*
                 * Tuple was determined to not belong to heapified RUN_FIRST,
                 * or replacement selection not in play.  Append the tuple to
                 * memtuples indifferently.
                 *
                 * dumptuples() does not trust that the next run's tuples are
                 * heapified.  Anything past the first run will always be
                 * quicksorted even when replacement selection is initially
                 * used.  (When it's never used, every tuple still takes this
                 * path.)
                 */
                tuple->tupindex = HEAP_RUN_NEXT;
                state->memtuples[state->memtupcount++] = *tuple;
            }

            /*
             * If we are over the memory limit, dump tuples till we're under.
             */
            dumptuples(state, false);
            break;

        default:
            elog(ERROR, "invalid tuplesort state");
            break;
    }
}

static bool
consider_abort_common(Tuplesortstate *state)
{
    Assert(state->sortKeys[0].abbrev_converter != NULL);
    Assert(state->sortKeys[0].abbrev_abort != NULL);
    Assert(state->sortKeys[0].abbrev_full_comparator != NULL);

    /*
     * Check effectiveness of abbreviation optimization.  Consider aborting
     * when still within memory limit.
     */
    if (state->status == TSS_INITIAL &&
        state->memtupcount >= state->abbrevNext)
    {
        state->abbrevNext *= 2;

        /*
         * Check opclass-supplied abbreviation abort routine.  It may indicate
         * that abbreviation should not proceed.
         */
        if (!state->sortKeys->abbrev_abort(state->memtupcount,
                                           state->sortKeys))
            return false;

        /*
         * Finally, restore authoritative comparator, and indicate that
         * abbreviation is not in play by setting abbrev_converter to NULL
         */
        state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
        state->sortKeys[0].abbrev_converter = NULL;
        /* Not strictly necessary, but be tidy */
        state->sortKeys[0].abbrev_abort = NULL;
        state->sortKeys[0].abbrev_full_comparator = NULL;

        /* Give up - expect original pass-by-value representation */
        return true;
    }

    return false;
}

/*
 * All tuples have been provided; finish the sort.
 */
void
tuplesort_performsort(Tuplesortstate *state)
{// #lizard forgives
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "performsort starting: %s",
             pg_rusage_show(&state->ru_start));
#endif

    switch (state->status)
    {
        case TSS_INITIAL:

            /*
             * We were able to accumulate all the tuples within the allowed
             * amount of memory.  Just qsort 'em and we're done.
             */
            tuplesort_sort_memtuples(state);
            state->current = 0;
            state->eof_reached = false;
            state->markpos_offset = 0;
            state->markpos_eof = false;
            state->status = TSS_SORTEDINMEM;
            break;

        case TSS_BOUNDED:

            /*
             * We were able to accumulate all the tuples required for output
             * in memory, using a heap to eliminate excess tuples.  Now we
             * have to transform the heap to a properly-sorted array.
             */
            sort_bounded_heap(state);
            state->current = 0;
            state->eof_reached = false;
            state->markpos_offset = 0;
            state->markpos_eof = false;
            state->status = TSS_SORTEDINMEM;
            break;

        case TSS_BUILDRUNS:

            /*
             * Finish tape-based sort.  First, flush all tuples remaining in
             * memory out to tape; then merge until we have a single remaining
             * run (or, if !randomAccess, one run per tape). Note that
             * mergeruns sets the correct state->status.
             */
            dumptuples(state, true);
            mergeruns(state);
            state->eof_reached = false;
            state->markpos_block = 0L;
            state->markpos_offset = 0;
            state->markpos_eof = false;
            break;

        default:
            elog(ERROR, "invalid tuplesort state");
            break;
    }

#ifdef TRACE_SORT
    if (trace_sort)
    {
        if (state->status == TSS_FINALMERGE)
            elog(LOG, "performsort done (except %d-way final merge): %s",
                 state->activeTapes,
                 pg_rusage_show(&state->ru_start));
        else
            elog(LOG, "performsort done: %s",
                 pg_rusage_show(&state->ru_start));
    }
#endif

    MemoryContextSwitchTo(oldcontext);
}

/*
 * Internal routine to fetch the next tuple in either forward or back
 * direction into *stup.  Returns FALSE if no more tuples.
 * Returned tuple belongs to tuplesort memory context, and must not be freed
 * by caller.  Note that fetched tuple is stored in memory that may be
 * recycled by any future fetch.
 */
static bool
tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                          SortTuple *stup)
{// #lizard forgives
    unsigned int tuplen;
    size_t        nmoved;

    switch (state->status)
    {
        case TSS_SORTEDINMEM:
            Assert(forward || state->randomAccess);
            Assert(!state->slabAllocatorUsed);
            if (forward)
            {
                if (state->current < state->memtupcount)
                {
                    *stup = state->memtuples[state->current++];
                    return true;
                }
                state->eof_reached = true;

                /*
                 * Complain if caller tries to retrieve more tuples than
                 * originally asked for in a bounded sort.  This is because
                 * returning EOF here might be the wrong thing.
                 */
                if (state->bounded && state->current >= state->bound)
                    elog(ERROR, "retrieved too many tuples in a bounded sort");

                return false;
            }
            else
            {
                if (state->current <= 0)
                    return false;

                /*
                 * if all tuples are fetched already then we return last
                 * tuple, else - tuple before last returned.
                 */
                if (state->eof_reached)
                    state->eof_reached = false;
                else
                {
                    state->current--;    /* last returned tuple */
                    if (state->current <= 0)
                        return false;
                }
                *stup = state->memtuples[state->current - 1];
                return true;
            }
            break;

        case TSS_SORTEDONTAPE:
            Assert(forward || state->randomAccess);
            Assert(state->slabAllocatorUsed);

            /*
             * The slot that held the tuple that we returned in previous
             * gettuple call can now be reused.
             */
            if (state->lastReturnedTuple)
            {
                RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
                state->lastReturnedTuple = NULL;
            }

            if (forward)
            {
                if (state->eof_reached)
                    return false;

                if ((tuplen = getlen(state, state->result_tape, true)) != 0)
                {
                    READTUP(state, stup, state->result_tape, tuplen);

                    /*
                     * Remember the tuple we return, so that we can recycle
                     * its memory on next call.  (This can be NULL, in the
                     * !state->tuples case).
                     */
                    state->lastReturnedTuple = stup->tuple;

                    return true;
                }
                else
                {
                    state->eof_reached = true;
                    return false;
                }
            }

            /*
             * Backward.
             *
             * if all tuples are fetched already then we return last tuple,
             * else - tuple before last returned.
             */
            if (state->eof_reached)
            {
                /*
                 * Seek position is pointing just past the zero tuplen at the
                 * end of file; back up to fetch last tuple's ending length
                 * word.  If seek fails we must have a completely empty file.
                 */
                nmoved = LogicalTapeBackspace(state->tapeset,
                                              state->result_tape,
                                              2 * sizeof(unsigned int));
                if (nmoved == 0)
                    return false;
                else if (nmoved != 2 * sizeof(unsigned int))
                    elog(ERROR, "unexpected tape position");
                state->eof_reached = false;
            }
            else
            {
                /*
                 * Back up and fetch previously-returned tuple's ending length
                 * word.  If seek fails, assume we are at start of file.
                 */
                nmoved = LogicalTapeBackspace(state->tapeset,
                                              state->result_tape,
                                              sizeof(unsigned int));
                if (nmoved == 0)
                    return false;
                else if (nmoved != sizeof(unsigned int))
                    elog(ERROR, "unexpected tape position");
                tuplen = getlen(state, state->result_tape, false);

                /*
                 * Back up to get ending length word of tuple before it.
                 */
                nmoved = LogicalTapeBackspace(state->tapeset,
                                              state->result_tape,
                                              tuplen + 2 * sizeof(unsigned int));
                if (nmoved == tuplen + sizeof(unsigned int))
                {
                    /*
                     * We backed up over the previous tuple, but there was no
                     * ending length word before it.  That means that the prev
                     * tuple is the first tuple in the file.  It is now the
                     * next to read in forward direction (not obviously right,
                     * but that is what in-memory case does).
                     */
                    return false;
                }
                else if (nmoved != tuplen + 2 * sizeof(unsigned int))
                    elog(ERROR, "bogus tuple length in backward scan");
            }

            tuplen = getlen(state, state->result_tape, false);

            /*
             * Now we have the length of the prior tuple, back up and read it.
             * Note: READTUP expects we are positioned after the initial
             * length word of the tuple, so back up to that point.
             */
            nmoved = LogicalTapeBackspace(state->tapeset,
                                          state->result_tape,
                                          tuplen);
            if (nmoved != tuplen)
                elog(ERROR, "bogus tuple length in backward scan");
            READTUP(state, stup, state->result_tape, tuplen);

            /*
             * Remember the tuple we return, so that we can recycle its memory
             * on next call. (This can be NULL, in the Datum case).
             */
            state->lastReturnedTuple = stup->tuple;

            return true;

        case TSS_FINALMERGE:
            Assert(forward);
            /* We are managing memory ourselves, with the slab allocator. */
            Assert(state->slabAllocatorUsed);

            /*
             * The slab slot holding the tuple that we returned in previous
             * gettuple call can now be reused.
             */
            if (state->lastReturnedTuple)
            {
                RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
                state->lastReturnedTuple = NULL;
            }

            /*
             * This code should match the inner loop of mergeonerun().
             */
            if (state->memtupcount > 0)
            {
                int            srcTape = state->memtuples[0].tupindex;
                SortTuple    newtup;

                *stup = state->memtuples[0];

                /*
                 * Remember the tuple we return, so that we can recycle its
                 * memory on next call. (This can be NULL, in the Datum case).
                 */
                state->lastReturnedTuple = stup->tuple;

                /*
                 * Pull next tuple from tape, and replace the returned tuple
                 * at top of the heap with it.
                 */
                if (!mergereadnext(state, srcTape, &newtup))
                {
                    /*
                     * If no more data, we've reached end of run on this tape.
                     * Remove the top node from the heap.
                     */
                    tuplesort_heap_delete_top(state, false);

                    /*
                     * Rewind to free the read buffer.  It'd go away at the
                     * end of the sort anyway, but better to release the
                     * memory early.
                     *
                     * XXX PG10MERGE: In XL, we use tuplesort_begin_merge() to
                     * merge tuples from datanode connections. We don't use
                     * tapeset in that code path. The following fix helps that
                     * case.
                     */
                    if (state->tapeset)
                        LogicalTapeRewindForWrite(state->tapeset, srcTape);
                    return true;
                }
                newtup.tupindex = srcTape;
                tuplesort_heap_replace_top(state, &newtup, false);
                return true;
            }
            return false;

        default:
            elog(ERROR, "invalid tuplesort state");
            return false;        /* keep compiler quiet */
    }
}

/*
 * Fetch the next tuple in either forward or back direction.
 * If successful, put tuple in slot and return TRUE; else, clear the slot
 * and return FALSE.
 *
 * Caller may optionally be passed back abbreviated value (on TRUE return
 * value) when abbreviation was used, which can be used to cheaply avoid
 * equality checks that might otherwise be required.  Caller can safely make a
 * determination of "non-equal tuple" based on simple binary inequality.  A
 * NULL value in leading attribute will set abbreviated value to zeroed
 * representation, which caller may rely on in abbreviated inequality check.
 *
 * If copy is true, the slot receives a copied tuple that will stay valid
 * regardless of future manipulations of the tuplesort's state.  Memory is
 * owned by the caller.  If copy is false, the slot will just receive a
 * pointer to a tuple held within the tuplesort, which is more efficient, but
 * only safe for callers that are prepared to have any subsequent manipulation
 * of the tuplesort's state invalidate slot contents.
 */
bool
tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
                       TupleTableSlot *slot, Datum *abbrev)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
    SortTuple    stup;

    if (!tuplesort_gettuple_common(state, forward, &stup))
        stup.tuple = NULL;

    MemoryContextSwitchTo(oldcontext);

    if (stup.tuple)
    {
        /* Record abbreviated key for caller */
        if (state->sortKeys->abbrev_converter && abbrev)
            *abbrev = stup.datum1;

        if (copy)
            stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);

        ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
        return true;
    }
    else
    {
        ExecClearTuple(slot);
        return false;
    }
}

/*
 * Fetch the next tuple in either forward or back direction.
 * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
 * context, and must not be freed by caller.  Caller may not rely on tuple
 * remaining valid after any further manipulation of tuplesort.
 */
HeapTuple
tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
    SortTuple    stup;

    if (!tuplesort_gettuple_common(state, forward, &stup))
        stup.tuple = NULL;

    MemoryContextSwitchTo(oldcontext);

    return stup.tuple;
}

/*
 * Fetch the next index tuple in either forward or back direction.
 * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
 * context, and must not be freed by caller.  Caller may not rely on tuple
 * remaining valid after any further manipulation of tuplesort.
 */
IndexTuple
tuplesort_getindextuple(Tuplesortstate *state, bool forward)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
    SortTuple    stup;

    if (!tuplesort_gettuple_common(state, forward, &stup))
        stup.tuple = NULL;

    MemoryContextSwitchTo(oldcontext);

    return (IndexTuple) stup.tuple;
}

/*
 * Fetch the next Datum in either forward or back direction.
 * Returns FALSE if no more datums.
 *
 * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
 * and is now owned by the caller (this differs from similar routines for
 * other types of tuplesorts).
 *
 * Caller may optionally be passed back abbreviated value (on TRUE return
 * value) when abbreviation was used, which can be used to cheaply avoid
 * equality checks that might otherwise be required.  Caller can safely make a
 * determination of "non-equal tuple" based on simple binary inequality.  A
 * NULL value will have a zeroed abbreviated value representation, which caller
 * may rely on in abbreviated inequality check.
 */
bool
tuplesort_getdatum(Tuplesortstate *state, bool forward,
                   Datum *val, bool *isNull, Datum *abbrev)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
    SortTuple    stup;

    if (!tuplesort_gettuple_common(state, forward, &stup))
    {
        MemoryContextSwitchTo(oldcontext);
        return false;
    }

    /* Record abbreviated key for caller */
    if (state->sortKeys->abbrev_converter && abbrev)
        *abbrev = stup.datum1;

    if (stup.isnull1 || !state->tuples)
    {
        *val = stup.datum1;
        *isNull = stup.isnull1;
    }
    else
    {
        /* use stup.tuple because stup.datum1 may be an abbreviation */
        *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
        *isNull = false;
    }

    MemoryContextSwitchTo(oldcontext);

    return true;
}

/*
 * Advance over N tuples in either forward or back direction,
 * without returning any data.  N==0 is a no-op.
 * Returns TRUE if successful, FALSE if ran out of tuples.
 */
bool
tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
{// #lizard forgives
    MemoryContext oldcontext;

    /*
     * We don't actually support backwards skip yet, because no callers need
     * it.  The API is designed to allow for that later, though.
     */
    Assert(forward);
    Assert(ntuples >= 0);

    switch (state->status)
    {
        case TSS_SORTEDINMEM:
            if (state->memtupcount - state->current >= ntuples)
            {
                state->current += ntuples;
                return true;
            }
            state->current = state->memtupcount;
            state->eof_reached = true;

            /*
             * Complain if caller tries to retrieve more tuples than
             * originally asked for in a bounded sort.  This is because
             * returning EOF here might be the wrong thing.
             */
            if (state->bounded && state->current >= state->bound)
                elog(ERROR, "retrieved too many tuples in a bounded sort");

            return false;

        case TSS_SORTEDONTAPE:
        case TSS_FINALMERGE:

            /*
             * We could probably optimize these cases better, but for now it's
             * not worth the trouble.
             */
            oldcontext = MemoryContextSwitchTo(state->sortcontext);
            while (ntuples-- > 0)
            {
                SortTuple    stup;

                if (!tuplesort_gettuple_common(state, forward, &stup))
                {
                    MemoryContextSwitchTo(oldcontext);
                    return false;
                }
                CHECK_FOR_INTERRUPTS();
            }
            MemoryContextSwitchTo(oldcontext);
            return true;

        default:
            elog(ERROR, "invalid tuplesort state");
            return false;        /* keep compiler quiet */
    }
}

/*
 * tuplesort_merge_order - report merge order we'll use for given memory
 * (note: "merge order" just means the number of input tapes in the merge).
 *
 * This is exported for use by the planner.  allowedMem is in bytes.
 */
int
tuplesort_merge_order(int64 allowedMem)
{
    int            mOrder;

    /*
     * We need one tape for each merge input, plus another one for the output,
     * and each of these tapes needs buffer space.  In addition we want
     * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
     * count).
     *
     * Note: you might be thinking we need to account for the memtuples[]
     * array in this calculation, but we effectively treat that as part of the
     * MERGE_BUFFER_SIZE workspace.
     */
    mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
        (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);

    /*
     * Even in minimum memory, use at least a MINORDER merge.  On the other
     * hand, even when we have lots of memory, do not use more than a MAXORDER
     * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
     * additional tape reduces the amount of memory available to build runs,
     * which in turn can cause the same sort to need more runs, which makes
     * merging slower even if it can still be done in a single pass.  Also,
     * high order merges are quite slow due to CPU cache effects; it can be
     * faster to pay the I/O cost of a polyphase merge than to perform a
     * single merge pass across many hundreds of tapes.
     */
    mOrder = Max(mOrder, MINORDER);
    mOrder = Min(mOrder, MAXORDER);

    return mOrder;
}

/*
 * useselection - determine algorithm to use to sort first run.
 *
 * It can sometimes be useful to use the replacement selection algorithm if it
 * results in one large run, and there is little available workMem.  See
 * remarks on RUN_SECOND optimization within dumptuples().
 */
static bool
useselection(Tuplesortstate *state)
{
    /*
     * memtupsize might be noticeably higher than memtupcount here in atypical
     * cases.  It seems slightly preferable to not allow recent outliers to
     * impact this determination.  Note that caller's trace_sort output
     * reports memtupcount instead.
     */
    if (state->memtupsize <= replacement_sort_tuples)
        return true;

    return false;
}

/*
 * inittapes - initialize for tape sorting.
 *
 * This is called only if we have found we don't have room to sort in memory.
 */
static void
inittapes(Tuplesortstate *state)
{// #lizard forgives
    int            maxTapes,
                j;
    int64        tapeSpace;

    /* Compute number of tapes to use: merge order plus 1 */
    maxTapes = tuplesort_merge_order(state->allowedMem) + 1;

    state->maxTapes = maxTapes;
    state->tapeRange = maxTapes - 1;

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "switching to external sort with %d tapes: %s",
             maxTapes, pg_rusage_show(&state->ru_start));
#endif

    /*
     * Decrease availMem to reflect the space needed for tape buffers, when
     * writing the initial runs; but don't decrease it to the point that we
     * have no room for tuples.  (That case is only likely to occur if sorting
     * pass-by-value Datums; in all other scenarios the memtuples[] array is
     * unlikely to occupy more than half of allowedMem.  In the pass-by-value
     * case it's not important to account for tuple space, so we don't care if
     * LACKMEM becomes inaccurate.)
     */
    tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;

    if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
        USEMEM(state, tapeSpace);

    /*
     * Make sure that the temp file(s) underlying the tape set are created in
     * suitable temp tablespaces.
     */
    PrepareTempTablespaces();

    /*
     * Create the tape set and allocate the per-tape data arrays.
     */
    state->tapeset = LogicalTapeSetCreate(maxTapes);

    state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
    state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
    state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
    state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
    state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));

    /*
     * Give replacement selection a try based on user setting.  There will be
     * a switch to a simple hybrid sort-merge strategy after the first run
     * (iff we could not output one long run).
     */
    state->replaceActive = useselection(state);

    if (state->replaceActive)
    {
        /*
         * Convert the unsorted contents of memtuples[] into a heap. Each
         * tuple is marked as belonging to run number zero.
         *
         * NOTE: we pass false for checkIndex since there's no point in
         * comparing indexes in this step, even though we do intend the
         * indexes to be part of the sort key...
         */
        int            ntuples = state->memtupcount;

#ifdef TRACE_SORT
        if (trace_sort)
            elog(LOG, "replacement selection will sort %d first run tuples",
                 state->memtupcount);
#endif
        state->memtupcount = 0; /* make the heap empty */

        for (j = 0; j < ntuples; j++)
        {
            /* Must copy source tuple to avoid possible overwrite */
            SortTuple    stup = state->memtuples[j];

            stup.tupindex = RUN_FIRST;
            tuplesort_heap_insert(state, &stup, false);
        }
        Assert(state->memtupcount == ntuples);
    }

    state->currentRun = RUN_FIRST;

    /*
     * Initialize variables of Algorithm D (step D1).
     */
    for (j = 0; j < maxTapes; j++)
    {
        state->tp_fib[j] = 1;
        state->tp_runs[j] = 0;
        state->tp_dummy[j] = 1;
        state->tp_tapenum[j] = j;
    }
    state->tp_fib[state->tapeRange] = 0;
    state->tp_dummy[state->tapeRange] = 0;

    state->Level = 1;
    state->destTape = 0;

    state->status = TSS_BUILDRUNS;
}

/*
 * selectnewtape -- select new tape for new initial run.
 *
 * This is called after finishing a run when we know another run
 * must be started.  This implements steps D3, D4 of Algorithm D.
 */
static void
selectnewtape(Tuplesortstate *state)
{
    int            j;
    int            a;

    /* Step D3: advance j (destTape) */
    if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
    {
        state->destTape++;
        return;
    }
    if (state->tp_dummy[state->destTape] != 0)
    {
        state->destTape = 0;
        return;
    }

    /* Step D4: increase level */
    state->Level++;
    a = state->tp_fib[0];
    for (j = 0; j < state->tapeRange; j++)
    {
        state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
        state->tp_fib[j] = a + state->tp_fib[j + 1];
    }
    state->destTape = 0;
}

/*
 * Initialize the slab allocation arena, for the given number of slots.
 */
static void
init_slab_allocator(Tuplesortstate *state, int numSlots)
{
    if (numSlots > 0)
    {
        char       *p;
        int            i;

        state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
        state->slabMemoryEnd = state->slabMemoryBegin +
            numSlots * SLAB_SLOT_SIZE;
        state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
        USEMEM(state, numSlots * SLAB_SLOT_SIZE);

        p = state->slabMemoryBegin;
        for (i = 0; i < numSlots - 1; i++)
        {
            ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
            p += SLAB_SLOT_SIZE;
        }
        ((SlabSlot *) p)->nextfree = NULL;
    }
    else
    {
        state->slabMemoryBegin = state->slabMemoryEnd = NULL;
        state->slabFreeHead = NULL;
    }
    state->slabAllocatorUsed = true;
}

/*
 * mergeruns -- merge all the completed initial runs.
 *
 * This implements steps D5, D6 of Algorithm D.  All input data has
 * already been written to initial runs on tape (see dumptuples).
 */
static void
mergeruns(Tuplesortstate *state)
{// #lizard forgives
    int            tapenum,
                svTape,
                svRuns,
                svDummy;
    int            numTapes;
    int            numInputTapes;

    Assert(state->status == TSS_BUILDRUNS);
    Assert(state->memtupcount == 0);

    if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
    {
        /*
         * If there are multiple runs to be merged, when we go to read back
         * tuples from disk, abbreviated keys will not have been stored, and
         * we don't care to regenerate them.  Disable abbreviation from this
         * point on.
         */
        state->sortKeys->abbrev_converter = NULL;
        state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;

        /* Not strictly necessary, but be tidy */
        state->sortKeys->abbrev_abort = NULL;
        state->sortKeys->abbrev_full_comparator = NULL;
    }

    /*
     * Reset tuple memory.  We've freed all the tuples that we previously
     * allocated.  We will use the slab allocator from now on.
     */
    MemoryContextDelete(state->tuplecontext);
    state->tuplecontext = NULL;

    /*
     * We no longer need a large memtuples array.  (We will allocate a smaller
     * one for the heap later.)
     */
    FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    pfree(state->memtuples);
    state->memtuples = NULL;

    /*
     * If we had fewer runs than tapes, refund the memory that we imagined we
     * would need for the tape buffers of the unused tapes.
     *
     * numTapes and numInputTapes reflect the actual number of tapes we will
     * use.  Note that the output tape's tape number is maxTapes - 1, so the
     * tape numbers of the used tapes are not consecutive, and you cannot just
     * loop from 0 to numTapes to visit all used tapes!
     */
    if (state->Level == 1)
    {
        numInputTapes = state->currentRun;
        numTapes = numInputTapes + 1;
        FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
    }
    else
    {
        numInputTapes = state->tapeRange;
        numTapes = state->maxTapes;
    }

    /*
     * Initialize the slab allocator.  We need one slab slot per input tape,
     * for the tuples in the heap, plus one to hold the tuple last returned
     * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
     * however, we don't need to do allocate anything.)
     *
     * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
     * to track memory usage of individual tuples.
     */
    if (state->tuples)
        init_slab_allocator(state, numInputTapes + 1);
    else
        init_slab_allocator(state, 0);

    /*
     * If we produced only one initial run (quite likely if the total data
     * volume is between 1X and 2X workMem when replacement selection is used,
     * but something we particularly count on when input is presorted), we can
     * just use that tape as the finished output, rather than doing a useless
     * merge.  (This obvious optimization is not in Knuth's algorithm.)
     */
    if (state->currentRun == RUN_SECOND)
    {
        state->result_tape = state->tp_tapenum[state->destTape];
        /* must freeze and rewind the finished output tape */
        LogicalTapeFreeze(state->tapeset, state->result_tape);
        state->status = TSS_SORTEDONTAPE;
        return;
    }

    /*
     * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
     * from each input tape.
     */
    state->memtupsize = numInputTapes;
    state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
    USEMEM(state, GetMemoryChunkSpace(state->memtuples));

    /*
     * Use all the remaining memory we have available for read buffers among
     * the input tapes.
     *
     * We do this only after checking for the case that we produced only one
     * initial run, because there is no need to use a large read buffer when
     * we're reading from a single tape.  With one tape, the I/O pattern will
     * be the same regardless of the buffer size.
     *
     * We don't try to "rebalance" the memory among tapes, when we start a new
     * merge phase, even if some tapes are inactive in the new phase.  That
     * would be hard, because logtape.c doesn't know where one run ends and
     * another begins.  When a new merge phase begins, and a tape doesn't
     * participate in it, its buffer nevertheless already contains tuples from
     * the next run on same tape, so we cannot release the buffer.  That's OK
     * in practice, merge performance isn't that sensitive to the amount of
     * buffers used, and most merge phases use all or almost all tapes,
     * anyway.
     */
#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
             (state->availMem) / 1024, numInputTapes);
#endif

    state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
    USEMEM(state, state->read_buffer_size * numInputTapes);

    /* End of step D2: rewind all output tapes to prepare for merging */
    for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
        LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);

    for (;;)
    {
        /*
         * At this point we know that tape[T] is empty.  If there's just one
         * (real or dummy) run left on each input tape, then only one merge
         * pass remains.  If we don't have to produce a materialized sorted
         * tape, we can stop at this point and do the final merge on-the-fly.
         */
        if (!state->randomAccess)
        {
            bool        allOneRun = true;

            Assert(state->tp_runs[state->tapeRange] == 0);
            for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
            {
                if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
                {
                    allOneRun = false;
                    break;
                }
            }
            if (allOneRun)
            {
                /* Tell logtape.c we won't be writing anymore */
                LogicalTapeSetForgetFreeSpace(state->tapeset);
                /* Initialize for the final merge pass */
                beginmerge(state);
                state->status = TSS_FINALMERGE;
                return;
            }
        }

        /* Step D5: merge runs onto tape[T] until tape[P] is empty */
        while (state->tp_runs[state->tapeRange - 1] ||
               state->tp_dummy[state->tapeRange - 1])
        {
            bool        allDummy = true;

            for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
            {
                if (state->tp_dummy[tapenum] == 0)
                {
                    allDummy = false;
                    break;
                }
            }

            if (allDummy)
            {
                state->tp_dummy[state->tapeRange]++;
                for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
                    state->tp_dummy[tapenum]--;
            }
            else
                mergeonerun(state);
        }

        /* Step D6: decrease level */
        if (--state->Level == 0)
            break;
        /* rewind output tape T to use as new input */
        LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
                                 state->read_buffer_size);
        /* rewind used-up input tape P, and prepare it for write pass */
        LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
        state->tp_runs[state->tapeRange - 1] = 0;

        /*
         * reassign tape units per step D6; note we no longer care about A[]
         */
        svTape = state->tp_tapenum[state->tapeRange];
        svDummy = state->tp_dummy[state->tapeRange];
        svRuns = state->tp_runs[state->tapeRange];
        for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
        {
            state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
            state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
            state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
        }
        state->tp_tapenum[0] = svTape;
        state->tp_dummy[0] = svDummy;
        state->tp_runs[0] = svRuns;
    }

    /*
     * Done.  Knuth says that the result is on TAPE[1], but since we exited
     * the loop without performing the last iteration of step D6, we have not
     * rearranged the tape unit assignment, and therefore the result is on
     * TAPE[T].  We need to do it this way so that we can freeze the final
     * output tape while rewinding it.  The last iteration of step D6 would be
     * a waste of cycles anyway...
     */
    state->result_tape = state->tp_tapenum[state->tapeRange];
    LogicalTapeFreeze(state->tapeset, state->result_tape);
    state->status = TSS_SORTEDONTAPE;

    /* Release the read buffers of all the other tapes, by rewinding them. */
    for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
    {
        if (tapenum != state->result_tape)
            LogicalTapeRewindForWrite(state->tapeset, tapenum);
    }
}

/*
 * Merge one run from each input tape, except ones with dummy runs.
 *
 * This is the inner loop of Algorithm D step D5.  We know that the
 * output tape is TAPE[T].
 */
static void
mergeonerun(Tuplesortstate *state)
{
    int            destTape = state->tp_tapenum[state->tapeRange];
    int            srcTape;

    /*
     * Start the merge by loading one tuple from each active source tape into
     * the heap.  We can also decrease the input run/dummy run counts.
     */
    beginmerge(state);

    /*
     * Execute merge by repeatedly extracting lowest tuple in heap, writing it
     * out, and replacing it with next tuple from same tape (if there is
     * another one).
     */
    while (state->memtupcount > 0)
    {
        SortTuple    stup;

        /* write the tuple to destTape */
        srcTape = state->memtuples[0].tupindex;
        WRITETUP(state, destTape, &state->memtuples[0]);

        /* recycle the slot of the tuple we just wrote out, for the next read */
        if (state->memtuples[0].tuple)
            RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);

        /*
         * pull next tuple from the tape, and replace the written-out tuple in
         * the heap with it.
         */
        if (mergereadnext(state, srcTape, &stup))
        {
            stup.tupindex = srcTape;
            tuplesort_heap_replace_top(state, &stup, false);

        }
        else
            tuplesort_heap_delete_top(state, false);
    }

    /*
     * When the heap empties, we're done.  Write an end-of-run marker on the
     * output tape, and increment its count of real runs.
     */
    markrunend(state, destTape);
    state->tp_runs[state->tapeRange]++;

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
             pg_rusage_show(&state->ru_start));
#endif
}

/*
 * beginmerge - initialize for a merge pass
 *
 * We decrease the counts of real and dummy runs for each tape, and mark
 * which tapes contain active input runs in mergeactive[].  Then, fill the
 * merge heap with the first tuple from each active tape.
 */
static void
beginmerge(Tuplesortstate *state)
{
    int            activeTapes;
    int            tapenum;
    int            srcTape;

    /* Heap should be empty here */
    Assert(state->memtupcount == 0);

    /* Adjust run counts and mark the active tapes */
    memset(state->mergeactive, 0,
           state->maxTapes * sizeof(*state->mergeactive));
    activeTapes = 0;
    for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
    {
        if (state->tp_dummy[tapenum] > 0)
            state->tp_dummy[tapenum]--;
        else
        {
            Assert(state->tp_runs[tapenum] > 0);
            state->tp_runs[tapenum]--;
            srcTape = state->tp_tapenum[tapenum];
            state->mergeactive[srcTape] = true;
            activeTapes++;
        }
    }
    Assert(activeTapes > 0);
    state->activeTapes = activeTapes;

    /* Load the merge heap with the first tuple from each input tape */
    for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
    {
        SortTuple    tup;

        if (mergereadnext(state, srcTape, &tup))
        {
            tup.tupindex = srcTape;
            tuplesort_heap_insert(state, &tup, false);
        }
    }
}

/*
 * mergereadnext - read next tuple from one merge input tape
 *
 * Returns false on EOF.
 */
static bool
mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
{
    unsigned int tuplen;

    if (!state->mergeactive[srcTape])
        return false;            /* tape's run is already exhausted */

    /* read next tuple, if any */
#ifdef PGXC
        if ((tuplen = GETLEN(state, srcTape, true)) == 0)
#else
    if ((tuplen = getlen(state, srcTape, true)) == 0)
#endif
    {
        state->mergeactive[srcTape] = false;
        return false;
    }
    READTUP(state, stup, srcTape, tuplen);

    return true;
}

/*
 * dumptuples - remove tuples from memtuples and write to tape
 *
 * This is used during initial-run building, but not during merging.
 *
 * When alltuples = false and replacement selection is still active, dump
 * only enough tuples to get under the availMem limit (and leave at least
 * one tuple in memtuples, since puttuple will then assume it is a heap that
 * has a tuple to compare to).  We always insist there be at least one free
 * slot in the memtuples[] array.
 *
 * When alltuples = true, dump everything currently in memory.  (This
 * case is only used at end of input data, although in practice only the
 * first run could fail to dump all tuples when we LACKMEM(), and only
 * when replacement selection is active.)
 *
 * If, when replacement selection is active, we see that the tuple run
 * number at the top of the heap has changed, start a new run.  This must be
 * the first run, because replacement selection is always abandoned for all
 * further runs.
 */
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{// #lizard forgives
    while (alltuples ||
           (LACKMEM(state) && state->memtupcount > 1) ||
           state->memtupcount >= state->memtupsize)
    {
        if (state->replaceActive)
        {
            /*
             * Still holding out for a case favorable to replacement
             * selection. Still incrementally spilling using heap.
             *
             * Dump the heap's frontmost entry, and remove it from the heap.
             */
            Assert(state->memtupcount > 0);
            WRITETUP(state, state->tp_tapenum[state->destTape],
                     &state->memtuples[0]);
            tuplesort_heap_delete_top(state, true);
        }
        else
        {
            /*
             * Once committed to quicksorting runs, never incrementally spill
             */
            dumpbatch(state, alltuples);
            break;
        }

        /*
         * If top run number has changed, we've finished the current run (this
         * can only be the first run), and will no longer spill incrementally.
         */
        if (state->memtupcount == 0 ||
            state->memtuples[0].tupindex == HEAP_RUN_NEXT)
        {
            markrunend(state, state->tp_tapenum[state->destTape]);
            Assert(state->currentRun == RUN_FIRST);
            state->currentRun++;
            state->tp_runs[state->destTape]++;
            state->tp_dummy[state->destTape]--; /* per Alg D step D2 */

#ifdef TRACE_SORT
            if (trace_sort)
                elog(LOG, "finished incrementally writing %s run %d to tape %d: %s",
                     (state->memtupcount == 0) ? "only" : "first",
                     state->currentRun, state->destTape,
                     pg_rusage_show(&state->ru_start));
#endif

            /*
             * Done if heap is empty, which is possible when there is only one
             * long run.
             */
            Assert(state->currentRun == RUN_SECOND);
            if (state->memtupcount == 0)
            {
                /*
                 * Replacement selection best case; no final merge required,
                 * because there was only one initial run (second run has no
                 * tuples).  See RUN_SECOND case in mergeruns().
                 */
                break;
            }

            /*
             * Abandon replacement selection for second run (as well as any
             * subsequent runs).
             */
            state->replaceActive = false;

            /*
             * First tuple of next run should not be heapified, and so will
             * bear placeholder run number.  In practice this must actually be
             * the second run, which just became the currentRun, so we're
             * clear to quicksort and dump the tuples in batch next time
             * memtuples becomes full.
             */
            Assert(state->memtuples[0].tupindex == HEAP_RUN_NEXT);
            selectnewtape(state);
        }
    }
}

/*
 * dumpbatch - sort and dump all memtuples, forming one run on tape
 *
 * Second or subsequent runs are never heapified by this module (although
 * heapification still respects run number differences between the first and
 * second runs), and a heap (replacement selection priority queue) is often
 * avoided in the first place.
 */
static void
dumpbatch(Tuplesortstate *state, bool alltuples)
{// #lizard forgives
    int            memtupwrite;
    int            i;

    /*
     * Final call might require no sorting, in rare cases where we just so
     * happen to have previously LACKMEM()'d at the point where exactly all
     * remaining tuples are loaded into memory, just before input was
     * exhausted.
     *
     * In general, short final runs are quite possible.  Rather than allowing
     * a special case where there was a superfluous selectnewtape() call (i.e.
     * a call with no subsequent run actually written to destTape), we prefer
     * to write out a 0 tuple run.
     *
     * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
     * the tape inactive for the merge when called from beginmerge().  This
     * case is therefore similar to the case where mergeonerun() finds a dummy
     * run for the tape, and so doesn't need to merge a run from the tape (or
     * conceptually "merges" the dummy run, if you prefer).  According to
     * Knuth, Algorithm D "isn't strictly optimal" in its method of
     * distribution and dummy run assignment; this edge case seems very
     * unlikely to make that appreciably worse.
     */
    Assert(state->status == TSS_BUILDRUNS);

    /*
     * It seems unlikely that this limit will ever be exceeded, but take no
     * chances
     */
    if (state->currentRun == INT_MAX)
        ereport(ERROR,
                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                 errmsg("cannot have more than %d runs for an external sort",
                        INT_MAX)));

    state->currentRun++;

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "starting quicksort of run %d: %s",
             state->currentRun, pg_rusage_show(&state->ru_start));
#endif

    /*
     * Sort all tuples accumulated within the allowed amount of memory for
     * this run using quicksort
     */
    tuplesort_sort_memtuples(state);

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "finished quicksort of run %d: %s",
             state->currentRun, pg_rusage_show(&state->ru_start));
#endif

    memtupwrite = state->memtupcount;
    for (i = 0; i < memtupwrite; i++)
    {
        WRITETUP(state, state->tp_tapenum[state->destTape],
                 &state->memtuples[i]);
        state->memtupcount--;
    }

    /*
     * Reset tuple memory.  We've freed all of the tuples that we previously
     * allocated.  It's important to avoid fragmentation when there is a stark
     * change in the sizes of incoming tuples.  Fragmentation due to
     * AllocSetFree's bucketing by size class might be particularly bad if
     * this step wasn't taken.
     */
    MemoryContextReset(state->tuplecontext);

    markrunend(state, state->tp_tapenum[state->destTape]);
    state->tp_runs[state->destTape]++;
    state->tp_dummy[state->destTape]--; /* per Alg D step D2 */

#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "finished writing run %d to tape %d: %s",
             state->currentRun, state->destTape,
             pg_rusage_show(&state->ru_start));
#endif

    if (!alltuples)
        selectnewtape(state);
}

/*
 * tuplesort_rescan        - rewind and replay the scan
 */
void
tuplesort_rescan(Tuplesortstate *state)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);

    Assert(state->randomAccess);

    switch (state->status)
    {
        case TSS_SORTEDINMEM:
            state->current = 0;
            state->eof_reached = false;
            state->markpos_offset = 0;
            state->markpos_eof = false;
            break;
        case TSS_SORTEDONTAPE:
            LogicalTapeRewindForRead(state->tapeset,
                                     state->result_tape,
                                     0);
            state->eof_reached = false;
            state->markpos_block = 0L;
            state->markpos_offset = 0;
            state->markpos_eof = false;
            break;
        default:
            elog(ERROR, "invalid tuplesort state");
            break;
    }

    MemoryContextSwitchTo(oldcontext);
}

/*
 * tuplesort_markpos    - saves current position in the merged sort file
 */
void
tuplesort_markpos(Tuplesortstate *state)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);

    Assert(state->randomAccess);

    switch (state->status)
    {
        case TSS_SORTEDINMEM:
            state->markpos_offset = state->current;
            state->markpos_eof = state->eof_reached;
            break;
        case TSS_SORTEDONTAPE:
            LogicalTapeTell(state->tapeset,
                            state->result_tape,
                            &state->markpos_block,
                            &state->markpos_offset);
            state->markpos_eof = state->eof_reached;
            break;
        default:
            elog(ERROR, "invalid tuplesort state");
            break;
    }

    MemoryContextSwitchTo(oldcontext);
}

/*
 * tuplesort_restorepos - restores current position in merged sort file to
 *                          last saved position
 */
void
tuplesort_restorepos(Tuplesortstate *state)
{
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);

    Assert(state->randomAccess);

    switch (state->status)
    {
        case TSS_SORTEDINMEM:
            state->current = state->markpos_offset;
            state->eof_reached = state->markpos_eof;
            break;
        case TSS_SORTEDONTAPE:
            LogicalTapeSeek(state->tapeset,
                            state->result_tape,
                            state->markpos_block,
                            state->markpos_offset);
            state->eof_reached = state->markpos_eof;
            break;
        default:
            elog(ERROR, "invalid tuplesort state");
            break;
    }

    MemoryContextSwitchTo(oldcontext);
}

/*
 * tuplesort_get_stats - extract summary statistics
 *
 * This can be called after tuplesort_performsort() finishes to obtain
 * printable summary information about how the sort was performed.
 */
void
tuplesort_get_stats(Tuplesortstate *state,
					TuplesortInstrumentation *stats)
{
    /*
     * Note: it might seem we should provide both memory and disk usage for a
     * disk-based sort.  However, the current code doesn't track memory space
     * accurately once we have begun to return tuples to the caller (since we
     * don't account for pfree's the caller is expected to do), so we cannot
     * rely on availMem in a disk sort.  This does not seem worth the overhead
     * to fix.  Is it worth creating an API for the memory context code to
     * tell us how much is actually used in sortcontext?
     */
    if (state->tapeset)
    {
		stats->spaceType = SORT_SPACE_TYPE_DISK;
		stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
    }
    else
    {
		stats->spaceType = SORT_SPACE_TYPE_MEMORY;
		stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
    }

    switch (state->status)
    {
        case TSS_SORTEDINMEM:
            if (state->boundUsed)
				stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
            else
				stats->sortMethod = SORT_TYPE_QUICKSORT;
            break;
        case TSS_SORTEDONTAPE:
			stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
            break;
        case TSS_FINALMERGE:
			stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
            break;
        default:
			stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
            break;
    }
}

/*
 * Convert TuplesortMethod to a string.
 */
const char *
tuplesort_method_name(TuplesortMethod m)
{
	switch (m)
	{
		case SORT_TYPE_STILL_IN_PROGRESS:
			return "still in progress";
		case SORT_TYPE_TOP_N_HEAPSORT:
			return "top-N heapsort";
		case SORT_TYPE_QUICKSORT:
			return "quicksort";
		case SORT_TYPE_EXTERNAL_SORT:
			return "external sort";
		case SORT_TYPE_EXTERNAL_MERGE:
			return "external merge";
	}

	return "unknown";
}

/*
 * Convert TuplesortSpaceType to a string.
 */
const char *
tuplesort_space_type_name(TuplesortSpaceType t)
{
	Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
	return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
}


/*
 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
 *
 * Compare two SortTuples.  If checkIndex is true, use the tuple index
 * as the front of the sort key; otherwise, no.
 *
 * Note that for checkIndex callers, the heap invariant is never
 * maintained beyond the first run, and so there are no COMPARETUP()
 * calls needed to distinguish tuples in HEAP_RUN_NEXT.
 */

#define HEAPCOMPARE(tup1,tup2) \
    (checkIndex && ((tup1)->tupindex != (tup2)->tupindex || \
                    (tup1)->tupindex == HEAP_RUN_NEXT) ? \
     ((tup1)->tupindex) - ((tup2)->tupindex) : \
     COMPARETUP(state, tup1, tup2))

/*
 * Convert the existing unordered array of SortTuples to a bounded heap,
 * discarding all but the smallest "state->bound" tuples.
 *
 * When working with a bounded heap, we want to keep the largest entry
 * at the root (array entry zero), instead of the smallest as in the normal
 * sort case.  This allows us to discard the largest entry cheaply.
 * Therefore, we temporarily reverse the sort direction.
 *
 * We assume that all entries in a bounded heap will always have tupindex
 * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
 * the direction of comparison for tupindexes.
 */
static void
make_bounded_heap(Tuplesortstate *state)
{
    int            tupcount = state->memtupcount;
    int            i;

    Assert(state->status == TSS_INITIAL);
    Assert(state->bounded);
    Assert(tupcount >= state->bound);

    /* Reverse sort direction so largest entry will be at root */
    reversedirection(state);

    state->memtupcount = 0;        /* make the heap empty */
    for (i = 0; i < tupcount; i++)
    {
        if (state->memtupcount < state->bound)
        {
            /* Insert next tuple into heap */
            /* Must copy source tuple to avoid possible overwrite */
            SortTuple    stup = state->memtuples[i];

            stup.tupindex = 0;    /* not used */
            tuplesort_heap_insert(state, &stup, false);
        }
        else
        {
            /*
             * The heap is full.  Replace the largest entry with the new
             * tuple, or just discard it, if it's larger than anything already
             * in the heap.
             */
            if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
            {
                free_sort_tuple(state, &state->memtuples[i]);
                CHECK_FOR_INTERRUPTS();
            }
            else
                tuplesort_heap_replace_top(state, &state->memtuples[i], false);
        }
    }

    Assert(state->memtupcount == state->bound);
    state->status = TSS_BOUNDED;
}

/*
 * Convert the bounded heap to a properly-sorted array
 */
static void
sort_bounded_heap(Tuplesortstate *state)
{
    int            tupcount = state->memtupcount;

    Assert(state->status == TSS_BOUNDED);
    Assert(state->bounded);
    Assert(tupcount == state->bound);

    /*
     * We can unheapify in place because each delete-top call will remove the
     * largest entry, which we can promptly store in the newly freed slot at
     * the end.  Once we're down to a single-entry heap, we're done.
     */
    while (state->memtupcount > 1)
    {
        SortTuple    stup = state->memtuples[0];

        /* this sifts-up the next-largest entry and decreases memtupcount */
        tuplesort_heap_delete_top(state, false);
        state->memtuples[state->memtupcount] = stup;
    }
    state->memtupcount = tupcount;

    /*
     * Reverse sort direction back to the original state.  This is not
     * actually necessary but seems like a good idea for tidiness.
     */
    reversedirection(state);

    state->status = TSS_SORTEDINMEM;
    state->boundUsed = true;
}

/*
 * Sort all memtuples using specialized qsort() routines.
 *
 * Quicksort is used for small in-memory sorts.  Quicksort is also generally
 * preferred to replacement selection for generating runs during external sort
 * operations, although replacement selection is sometimes used for the first
 * run.
 */
static void
tuplesort_sort_memtuples(Tuplesortstate *state)
{
    if (state->memtupcount > 1)
    {
        /* Can we use the single-key sort function? */
        if (state->onlyKey != NULL)
            qsort_ssup(state->memtuples, state->memtupcount,
                       state->onlyKey);
        else
            qsort_tuple(state->memtuples,
                        state->memtupcount,
                        state->comparetup,
                        state);
    }
}

/*
 * Insert a new tuple into an empty or existing heap, maintaining the
 * heap invariant.  Caller is responsible for ensuring there's room.
 *
 * Note: For some callers, tuple points to a memtuples[] entry above the
 * end of the heap.  This is safe as long as it's not immediately adjacent
 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
 * is, it might get overwritten before being moved into the heap!
 */
static void
tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
                      bool checkIndex)
{
    SortTuple  *memtuples;
    int            j;

    memtuples = state->memtuples;
    Assert(state->memtupcount < state->memtupsize);
    Assert(!checkIndex || tuple->tupindex == RUN_FIRST);

    CHECK_FOR_INTERRUPTS();

    /*
     * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
     * using 1-based array indexes, not 0-based.
     */
    j = state->memtupcount++;
    while (j > 0)
    {
        int            i = (j - 1) >> 1;

        if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
            break;
        memtuples[j] = memtuples[i];
        j = i;
    }
    memtuples[j] = *tuple;
}

/*
 * Remove the tuple at state->memtuples[0] from the heap.  Decrement
 * memtupcount, and sift up to maintain the heap invariant.
 *
 * The caller has already free'd the tuple the top node points to,
 * if necessary.
 */
static void
tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex)
{
    SortTuple  *memtuples = state->memtuples;
    SortTuple  *tuple;

    Assert(!checkIndex || state->currentRun == RUN_FIRST);
    if (--state->memtupcount <= 0)
        return;

    /*
     * Remove the last tuple in the heap, and re-insert it, by replacing the
     * current top node with it.
     */
    tuple = &memtuples[state->memtupcount];
    tuplesort_heap_replace_top(state, tuple, checkIndex);
}

/*
 * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
 * maintain the heap invariant.
 *
 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
 * Heapsort, steps H3-H8).
 */
static void
tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple,
                           bool checkIndex)
{
    SortTuple  *memtuples = state->memtuples;
    unsigned int i,
                n;

    Assert(!checkIndex || state->currentRun == RUN_FIRST);
    Assert(state->memtupcount >= 1);

    CHECK_FOR_INTERRUPTS();

    /*
     * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
     * This prevents overflow in the "2 * i + 1" calculation, since at the top
     * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
     */
    n = state->memtupcount;
    i = 0;                        /* i is where the "hole" is */
    for (;;)
    {
        unsigned int j = 2 * i + 1;

        if (j >= n)
            break;
        if (j + 1 < n &&
            HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
            j++;
        if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
            break;
        memtuples[i] = memtuples[j];
        i = j;
    }
    memtuples[i] = *tuple;
}

/*
 * Function to reverse the sort direction from its current state
 *
 * It is not safe to call this when performing hash tuplesorts
 */
static void
reversedirection(Tuplesortstate *state)
{
    SortSupport sortKey = state->sortKeys;
    int            nkey;

    for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
    {
        sortKey->ssup_reverse = !sortKey->ssup_reverse;
        sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
    }
}


/*
 * Tape interface routines
 */

static unsigned int
getlen(Tuplesortstate *state, int tapenum, bool eofOK)
{
    unsigned int len;

    if (LogicalTapeRead(state->tapeset, tapenum,
                        &len, sizeof(len)) != sizeof(len))
        elog(ERROR, "unexpected end of tape");
    if (len == 0 && !eofOK)
        elog(ERROR, "unexpected end of data");
    return len;
}

static void
markrunend(Tuplesortstate *state, int tapenum)
{
    unsigned int len = 0;

    LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
}

/*
 * Get memory for tuple from within READTUP() routine.
 *
 * We use next free slot from the slab allocator, or palloc() if the tuple
 * is too large for that.
 */
static void *
readtup_alloc(Tuplesortstate *state, Size tuplen)
{
    SlabSlot   *buf;

    /*
     * We pre-allocate enough slots in the slab arena that we should never run
     * out.
     */
    Assert(state->slabFreeHead);

    if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
        return MemoryContextAlloc(state->sortcontext, tuplen);
    else
    {
        buf = state->slabFreeHead;
        /* Reuse this slot */
        state->slabFreeHead = buf->nextfree;

        return buf;
    }
}


/*
 * Routines specialized for HeapTuple (actually MinimalTuple) case
 */

static int
comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
{
    SortSupport sortKey = state->sortKeys;
    HeapTupleData ltup;
    HeapTupleData rtup;
    TupleDesc    tupDesc;
    int            nkey;
    int32        compare;
    AttrNumber    attno;
    Datum        datum1,
                datum2;
    bool        isnull1,
                isnull2;


    /* Compare the leading sort key */
    compare = ApplySortComparator(a->datum1, a->isnull1,
                                  b->datum1, b->isnull1,
                                  sortKey);
    if (compare != 0)
        return compare;

    /* Compare additional sort keys */
    ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
    ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
    rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
    rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
	ltup.t_tableOid = InvalidOid;
	rtup.t_xc_node_id = InvalidOid;
    tupDesc = state->tupDesc;

    if (sortKey->abbrev_converter)
    {
        attno = sortKey->ssup_attno;

        datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
        datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);

        compare = ApplySortAbbrevFullComparator(datum1, isnull1,
                                                datum2, isnull2,
                                                sortKey);
        if (compare != 0)
            return compare;
    }

    sortKey++;
    for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
    {
        attno = sortKey->ssup_attno;

        datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
        datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);

        compare = ApplySortComparator(datum1, isnull1,
                                      datum2, isnull2,
                                      sortKey);
        if (compare != 0)
            return compare;
    }

    return 0;
}

static void
copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
{
    /*
     * We expect the passed "tup" to be a TupleTableSlot, and form a
     * MinimalTuple using the exported interface for that.
     */
    TupleTableSlot *slot = (TupleTableSlot *) tup;
    Datum        original;
    MinimalTuple tuple;
    HeapTupleData htup;
    MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);

    /* copy the tuple into sort storage */
    tuple = ExecCopySlotMinimalTuple(slot);
    stup->tuple = (void *) tuple;
    USEMEM(state, GetMemoryChunkSpace(tuple));
    /* set up first-column key value */
    htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
    htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
	htup.t_tableOid = InvalidOid;
	htup.t_xc_node_id = InvalidOid;

    original = heap_getattr(&htup,
                            state->sortKeys[0].ssup_attno,
                            state->tupDesc,
                            &stup->isnull1);

    MemoryContextSwitchTo(oldcontext);

    if (!state->sortKeys->abbrev_converter || stup->isnull1)
    {
        /*
         * Store ordinary Datum representation, or NULL value.  If there is a
         * converter it won't expect NULL values, and cost model is not
         * required to account for NULL, so in that case we avoid calling
         * converter and just set datum1 to zeroed representation (to be
         * consistent, and to support cheap inequality tests for NULL
         * abbreviated keys).
         */
        stup->datum1 = original;
    }
    else if (!consider_abort_common(state))
    {
        /* Store abbreviated key representation */
        stup->datum1 = state->sortKeys->abbrev_converter(original,
                                                         state->sortKeys);
    }
    else
    {
        /* Abort abbreviation */
        int            i;

        stup->datum1 = original;

        /*
         * Set state to be consistent with never trying abbreviation.
         *
         * Alter datum1 representation in already-copied tuples, so as to
         * ensure a consistent representation (current tuple was just
         * handled).  It does not matter if some dumped tuples are already
         * sorted on tape, since serialized tuples lack abbreviated keys
         * (TSS_BUILDRUNS state prevents control reaching here in any case).
         */
        for (i = 0; i < state->memtupcount; i++)
        {
            SortTuple  *mtup = &state->memtuples[i];

            htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
                MINIMAL_TUPLE_OFFSET;
            htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
                                             MINIMAL_TUPLE_OFFSET);

            mtup->datum1 = heap_getattr(&htup,
                                        state->sortKeys[0].ssup_attno,
                                        state->tupDesc,
                                        &mtup->isnull1);
        }
    }
}

static void
writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
{
    MinimalTuple tuple = (MinimalTuple) stup->tuple;

    /* the part of the MinimalTuple we'll write: */
    char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
    unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;

    /* total on-disk footprint: */
    unsigned int tuplen = tupbodylen + sizeof(int);

    LogicalTapeWrite(state->tapeset, tapenum,
                     (void *) &tuplen, sizeof(tuplen));
    LogicalTapeWrite(state->tapeset, tapenum,
                     (void *) tupbody, tupbodylen);
    if (state->randomAccess)    /* need trailing length word? */
        LogicalTapeWrite(state->tapeset, tapenum,
                         (void *) &tuplen, sizeof(tuplen));

    if (!state->slabAllocatorUsed)
    {
        FREEMEM(state, GetMemoryChunkSpace(tuple));
        heap_free_minimal_tuple(tuple);
    }
}

static void
readtup_heap(Tuplesortstate *state, SortTuple *stup,
             int tapenum, unsigned int len)
{
    unsigned int tupbodylen = len - sizeof(int);
    unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
    MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
    char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
    HeapTupleData htup;

    /* read in the tuple proper */
    tuple->t_len = tuplen;
    LogicalTapeReadExact(state->tapeset, tapenum,
                         tupbody, tupbodylen);
    if (state->randomAccess)    /* need trailing length word? */
        LogicalTapeReadExact(state->tapeset, tapenum,
                             &tuplen, sizeof(tuplen));
    stup->tuple = (void *) tuple;
    /* set up first-column key value */
    htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
    htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
    stup->datum1 = heap_getattr(&htup,
                                state->sortKeys[0].ssup_attno,
                                state->tupDesc,
                                &stup->isnull1);
}

#ifdef PGXC
static unsigned int
getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK)
{
    ResponseCombiner *combiner = state->combiner;
    TupleTableSlot   *dstslot = combiner->ss.ps.ps_ResultTupleSlot;
    TupleTableSlot   *slot;

    combiner->current_conn = tapenum;
    slot = FetchTuple(combiner);
    if (TupIsNull(slot))
    {
        if (eofOK)
            return 0;
        else
            elog(ERROR, "unexpected end of data");
    }

    if (slot != dstslot)
        ExecCopySlot(dstslot, slot);

    return 1;
}

static void
readtup_datanode(Tuplesortstate *state, SortTuple *stup,
                 int tapenum, unsigned int len)
{
    TupleTableSlot *slot = state->combiner->ss.ps.ps_ResultTupleSlot;
    MinimalTuple tuple;
    HeapTupleData htup;

    Assert(!TupIsNull(slot));

    /* copy the tuple into sort storage */
    tuple = ExecCopySlotMinimalTuple(slot);
    stup->tuple = (void *) tuple;
    USEMEM(state, GetMemoryChunkSpace(tuple));
    /* set up first-column key value */
    htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
    htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
	htup.t_tableOid = InvalidOid;
    stup->datum1 = heap_getattr(&htup,
                                state->sortKeys[0].ssup_attno,
                                state->tupDesc,
                                &stup->isnull1);
}
#endif /* PGXC */

/*
 * Routines specialized for the CLUSTER case (HeapTuple data, with
 * comparisons per a btree index definition)
 */

static int
comparetup_cluster(const SortTuple *a, const SortTuple *b,
                   Tuplesortstate *state)
{// #lizard forgives
    SortSupport sortKey = state->sortKeys;
    HeapTuple    ltup;
    HeapTuple    rtup;
    TupleDesc    tupDesc;
    int            nkey;
    int32        compare;
    Datum        datum1,
                datum2;
    bool        isnull1,
                isnull2;
    AttrNumber    leading = state->indexInfo->ii_KeyAttrNumbers[0];

    /* Be prepared to compare additional sort keys */
    ltup = (HeapTuple) a->tuple;
    rtup = (HeapTuple) b->tuple;
    tupDesc = state->tupDesc;

    /* Compare the leading sort key, if it's simple */
    if (leading != 0)
    {
        compare = ApplySortComparator(a->datum1, a->isnull1,
                                      b->datum1, b->isnull1,
                                      sortKey);
        if (compare != 0)
            return compare;

        if (sortKey->abbrev_converter)
        {
            datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
            datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);

            compare = ApplySortAbbrevFullComparator(datum1, isnull1,
                                                    datum2, isnull2,
                                                    sortKey);
        }
        if (compare != 0 || state->nKeys == 1)
            return compare;
        /* Compare additional columns the hard way */
        sortKey++;
        nkey = 1;
    }
    else
    {
        /* Must compare all keys the hard way */
        nkey = 0;
    }

    if (state->indexInfo->ii_Expressions == NULL)
    {
        /* If not expression index, just compare the proper heap attrs */

        for (; nkey < state->nKeys; nkey++, sortKey++)
        {
            AttrNumber    attno = state->indexInfo->ii_KeyAttrNumbers[nkey];

            datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
            datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);

            compare = ApplySortComparator(datum1, isnull1,
                                          datum2, isnull2,
                                          sortKey);
            if (compare != 0)
                return compare;
        }
    }
    else
    {
        /*
         * In the expression index case, compute the whole index tuple and
         * then compare values.  It would perhaps be faster to compute only as
         * many columns as we need to compare, but that would require
         * duplicating all the logic in FormIndexDatum.
         */
        Datum        l_index_values[INDEX_MAX_KEYS];
        bool        l_index_isnull[INDEX_MAX_KEYS];
        Datum        r_index_values[INDEX_MAX_KEYS];
        bool        r_index_isnull[INDEX_MAX_KEYS];
        TupleTableSlot *ecxt_scantuple;

        /* Reset context each time to prevent memory leakage */
        ResetPerTupleExprContext(state->estate);

        ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;

        ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false);
        FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
                       l_index_values, l_index_isnull);

        ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false);
        FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
                       r_index_values, r_index_isnull);

        for (; nkey < state->nKeys; nkey++, sortKey++)
        {
            compare = ApplySortComparator(l_index_values[nkey],
                                          l_index_isnull[nkey],
                                          r_index_values[nkey],
                                          r_index_isnull[nkey],
                                          sortKey);
            if (compare != 0)
                return compare;
        }
    }

    return 0;
}

static void
copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
{
    HeapTuple    tuple = (HeapTuple) tup;
    Datum        original;
    MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);

    /* copy the tuple into sort storage */
    tuple = heap_copytuple(tuple);
    stup->tuple = (void *) tuple;
    USEMEM(state, GetMemoryChunkSpace(tuple));

    MemoryContextSwitchTo(oldcontext);

    /*
     * set up first-column key value, and potentially abbreviate, if it's a
     * simple column
     */
    if (state->indexInfo->ii_KeyAttrNumbers[0] == 0)
        return;

    original = heap_getattr(tuple,
                            state->indexInfo->ii_KeyAttrNumbers[0],
                            state->tupDesc,
                            &stup->isnull1);

    if (!state->sortKeys->abbrev_converter || stup->isnull1)
    {
        /*
         * Store ordinary Datum representation, or NULL value.  If there is a
         * converter it won't expect NULL values, and cost model is not
         * required to account for NULL, so in that case we avoid calling
         * converter and just set datum1 to zeroed representation (to be
         * consistent, and to support cheap inequality tests for NULL
         * abbreviated keys).
         */
        stup->datum1 = original;
    }
    else if (!consider_abort_common(state))
    {
        /* Store abbreviated key representation */
        stup->datum1 = state->sortKeys->abbrev_converter(original,
                                                         state->sortKeys);
    }
    else
    {
        /* Abort abbreviation */
        int            i;

        stup->datum1 = original;

        /*
         * Set state to be consistent with never trying abbreviation.
         *
         * Alter datum1 representation in already-copied tuples, so as to
         * ensure a consistent representation (current tuple was just
         * handled).  It does not matter if some dumped tuples are already
         * sorted on tape, since serialized tuples lack abbreviated keys
         * (TSS_BUILDRUNS state prevents control reaching here in any case).
         */
        for (i = 0; i < state->memtupcount; i++)
        {
            SortTuple  *mtup = &state->memtuples[i];

            tuple = (HeapTuple) mtup->tuple;
            mtup->datum1 = heap_getattr(tuple,
                                        state->indexInfo->ii_KeyAttrNumbers[0],
                                        state->tupDesc,
                                        &mtup->isnull1);
        }
    }
}

static void
writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
{
    HeapTuple    tuple = (HeapTuple) stup->tuple;
    unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);

    /* We need to store t_self, but not other fields of HeapTupleData */
    LogicalTapeWrite(state->tapeset, tapenum,
                     &tuplen, sizeof(tuplen));
    LogicalTapeWrite(state->tapeset, tapenum,
                     &tuple->t_self, sizeof(ItemPointerData));
    LogicalTapeWrite(state->tapeset, tapenum,
                     tuple->t_data, tuple->t_len);
    if (state->randomAccess)    /* need trailing length word? */
        LogicalTapeWrite(state->tapeset, tapenum,
                         &tuplen, sizeof(tuplen));

    if (!state->slabAllocatorUsed)
    {
        FREEMEM(state, GetMemoryChunkSpace(tuple));
        heap_freetuple(tuple);
    }
}

static void
readtup_cluster(Tuplesortstate *state, SortTuple *stup,
                int tapenum, unsigned int tuplen)
{
    unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
    HeapTuple    tuple = (HeapTuple) readtup_alloc(state,
                                                  t_len + HEAPTUPLESIZE);

    /* Reconstruct the HeapTupleData header */
    tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
    tuple->t_len = t_len;
    LogicalTapeReadExact(state->tapeset, tapenum,
                         &tuple->t_self, sizeof(ItemPointerData));
    /* We don't currently bother to reconstruct t_tableOid */
    tuple->t_tableOid = InvalidOid;
#ifdef PGXC
    tuple->t_xc_node_id = 0;
#endif

    /* Read in the tuple body */
    LogicalTapeReadExact(state->tapeset, tapenum,
                         tuple->t_data, tuple->t_len);
    if (state->randomAccess)    /* need trailing length word? */
        LogicalTapeReadExact(state->tapeset, tapenum,
                             &tuplen, sizeof(tuplen));
    stup->tuple = (void *) tuple;
    /* set up first-column key value, if it's a simple column */
    if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
        stup->datum1 = heap_getattr(tuple,
                                    state->indexInfo->ii_KeyAttrNumbers[0],
                                    state->tupDesc,
                                    &stup->isnull1);
}

/*
 * Routines specialized for IndexTuple case
 *
 * The btree and hash cases require separate comparison functions, but the
 * IndexTuple representation is the same so the copy/write/read support
 * functions can be shared.
 */

static int
comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                       Tuplesortstate *state)
{// #lizard forgives
    /*
     * This is similar to comparetup_heap(), but expects index tuples.  There
     * is also special handling for enforcing uniqueness, and special
     * treatment for equal keys at the end.
     */
    SortSupport sortKey = state->sortKeys;
    IndexTuple    tuple1;
    IndexTuple    tuple2;
    int            keysz;
    TupleDesc    tupDes;
    bool        equal_hasnull = false;
    int            nkey;
    int32        compare;
    Datum        datum1,
                datum2;
    bool        isnull1,
                isnull2;


    /* Compare the leading sort key */
    compare = ApplySortComparator(a->datum1, a->isnull1,
                                  b->datum1, b->isnull1,
                                  sortKey);
    if (compare != 0)
        return compare;

    /* Compare additional sort keys */
    tuple1 = (IndexTuple) a->tuple;
    tuple2 = (IndexTuple) b->tuple;
    keysz = state->nKeys;
    tupDes = RelationGetDescr(state->indexRel);

    if (sortKey->abbrev_converter)
    {
        datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
        datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);

        compare = ApplySortAbbrevFullComparator(datum1, isnull1,
                                                datum2, isnull2,
                                                sortKey);
        if (compare != 0)
            return compare;
    }

    /* they are equal, so we only need to examine one null flag */
    if (a->isnull1)
        equal_hasnull = true;

    sortKey++;
    for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
    {
        datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
        datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);

        compare = ApplySortComparator(datum1, isnull1,
                                      datum2, isnull2,
                                      sortKey);
        if (compare != 0)
            return compare;        /* done when we find unequal attributes */

        /* they are equal, so we only need to examine one null flag */
        if (isnull1)
            equal_hasnull = true;
    }

    /*
     * If btree has asked us to enforce uniqueness, complain if two equal
     * tuples are detected (unless there was at least one NULL field).
     *
     * It is sufficient to make the test here, because if two tuples are equal
     * they *must* get compared at some stage of the sort --- otherwise the
     * sort algorithm wouldn't have checked whether one must appear before the
     * other.
     */
    if (state->enforceUnique && !equal_hasnull)
    {
        Datum        values[INDEX_MAX_KEYS];
        bool        isnull[INDEX_MAX_KEYS];
        char       *key_desc;

        /*
         * Some rather brain-dead implementations of qsort (such as the one in
         * QNX 4) will sometimes call the comparison routine to compare a
         * value to itself, but we always use our own implementation, which
         * does not.
         */
        Assert(tuple1 != tuple2);

        index_deform_tuple(tuple1, tupDes, values, isnull);

        key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);

        ereport(ERROR,
                (errcode(ERRCODE_UNIQUE_VIOLATION),
                 errmsg("could not create unique index \"%s\"",
                        RelationGetRelationName(state->indexRel)),
                 key_desc ? errdetail("Key %s is duplicated.", key_desc) :
                 errdetail("Duplicate keys exist."),
                 errtableconstraint(state->heapRel,
                                    RelationGetRelationName(state->indexRel))));
    }

    /*
     * If key values are equal, we sort on ItemPointer.  This does not affect
     * validity of the finished index, but it may be useful to have index
     * scans in physical order.
     */
    {
        BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
        BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);

        if (blk1 != blk2)
            return (blk1 < blk2) ? -1 : 1;
    }
    {
        OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
        OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);

        if (pos1 != pos2)
            return (pos1 < pos2) ? -1 : 1;
    }

    return 0;
}

static int
comparetup_index_hash(const SortTuple *a, const SortTuple *b,
                      Tuplesortstate *state)
{
    Bucket        bucket1;
    Bucket        bucket2;
    IndexTuple    tuple1;
    IndexTuple    tuple2;

    /*
     * Fetch hash keys and mask off bits we don't want to sort by. We know
     * that the first column of the index tuple is the hash key.
     */
    Assert(!a->isnull1);
    bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
                                   state->max_buckets, state->high_mask,
                                   state->low_mask);
    Assert(!b->isnull1);
    bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
                                   state->max_buckets, state->high_mask,
                                   state->low_mask);
    if (bucket1 > bucket2)
        return 1;
    else if (bucket1 < bucket2)
        return -1;

    /*
     * If hash values are equal, we sort on ItemPointer.  This does not affect
     * validity of the finished index, but it may be useful to have index
     * scans in physical order.
     */
    tuple1 = (IndexTuple) a->tuple;
    tuple2 = (IndexTuple) b->tuple;

    {
        BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
        BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);

        if (blk1 != blk2)
            return (blk1 < blk2) ? -1 : 1;
    }
    {
        OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
        OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);

        if (pos1 != pos2)
            return (pos1 < pos2) ? -1 : 1;
    }

    return 0;
}

static void
copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
{
    IndexTuple    tuple = (IndexTuple) tup;
    unsigned int tuplen = IndexTupleSize(tuple);
    IndexTuple    newtuple;
    Datum        original;

    /* copy the tuple into sort storage */
    newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
    memcpy(newtuple, tuple, tuplen);
    USEMEM(state, GetMemoryChunkSpace(newtuple));
    stup->tuple = (void *) newtuple;
    /* set up first-column key value */
    original = index_getattr(newtuple,
                             1,
                             RelationGetDescr(state->indexRel),
                             &stup->isnull1);

    if (!state->sortKeys->abbrev_converter || stup->isnull1)
    {
        /*
         * Store ordinary Datum representation, or NULL value.  If there is a
         * converter it won't expect NULL values, and cost model is not
         * required to account for NULL, so in that case we avoid calling
         * converter and just set datum1 to zeroed representation (to be
         * consistent, and to support cheap inequality tests for NULL
         * abbreviated keys).
         */
        stup->datum1 = original;
    }
    else if (!consider_abort_common(state))
    {
        /* Store abbreviated key representation */
        stup->datum1 = state->sortKeys->abbrev_converter(original,
                                                         state->sortKeys);
    }
    else
    {
        /* Abort abbreviation */
        int            i;

        stup->datum1 = original;

        /*
         * Set state to be consistent with never trying abbreviation.
         *
         * Alter datum1 representation in already-copied tuples, so as to
         * ensure a consistent representation (current tuple was just
         * handled).  It does not matter if some dumped tuples are already
         * sorted on tape, since serialized tuples lack abbreviated keys
         * (TSS_BUILDRUNS state prevents control reaching here in any case).
         */
        for (i = 0; i < state->memtupcount; i++)
        {
            SortTuple  *mtup = &state->memtuples[i];

            tuple = (IndexTuple) mtup->tuple;
            mtup->datum1 = index_getattr(tuple,
                                         1,
                                         RelationGetDescr(state->indexRel),
                                         &mtup->isnull1);
        }
    }
}

static void
writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
{
    IndexTuple    tuple = (IndexTuple) stup->tuple;
    unsigned int tuplen;

    tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
    LogicalTapeWrite(state->tapeset, tapenum,
                     (void *) &tuplen, sizeof(tuplen));
    LogicalTapeWrite(state->tapeset, tapenum,
                     (void *) tuple, IndexTupleSize(tuple));
    if (state->randomAccess)    /* need trailing length word? */
        LogicalTapeWrite(state->tapeset, tapenum,
                         (void *) &tuplen, sizeof(tuplen));

    if (!state->slabAllocatorUsed)
    {
        FREEMEM(state, GetMemoryChunkSpace(tuple));
        pfree(tuple);
    }
}

static void
readtup_index(Tuplesortstate *state, SortTuple *stup,
              int tapenum, unsigned int len)
{
    unsigned int tuplen = len - sizeof(unsigned int);
    IndexTuple    tuple = (IndexTuple) readtup_alloc(state, tuplen);

    LogicalTapeReadExact(state->tapeset, tapenum,
                         tuple, tuplen);
    if (state->randomAccess)    /* need trailing length word? */
        LogicalTapeReadExact(state->tapeset, tapenum,
                             &tuplen, sizeof(tuplen));
    stup->tuple = (void *) tuple;
    /* set up first-column key value */
    stup->datum1 = index_getattr(tuple,
                                 1,
                                 RelationGetDescr(state->indexRel),
                                 &stup->isnull1);
}

/*
 * Routines specialized for DatumTuple case
 */

static int
comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
{
    int            compare;

    compare = ApplySortComparator(a->datum1, a->isnull1,
                                  b->datum1, b->isnull1,
                                  state->sortKeys);
    if (compare != 0)
        return compare;

    /* if we have abbreviations, then "tuple" has the original value */

    if (state->sortKeys->abbrev_converter)
        compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
                                                PointerGetDatum(b->tuple), b->isnull1,
                                                state->sortKeys);

    return compare;
}

static void
copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
{
    /* Not currently needed */
    elog(ERROR, "copytup_datum() should not be called");
}

static void
writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
{
    void       *waddr;
    unsigned int tuplen;
    unsigned int writtenlen;

    if (stup->isnull1)
    {
        waddr = NULL;
        tuplen = 0;
    }
    else if (!state->tuples)
    {
        waddr = &stup->datum1;
        tuplen = sizeof(Datum);
    }
    else
    {
        waddr = stup->tuple;
        tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
        Assert(tuplen != 0);
    }

    writtenlen = tuplen + sizeof(unsigned int);

    LogicalTapeWrite(state->tapeset, tapenum,
                     (void *) &writtenlen, sizeof(writtenlen));
    LogicalTapeWrite(state->tapeset, tapenum,
                     waddr, tuplen);
    if (state->randomAccess)    /* need trailing length word? */
        LogicalTapeWrite(state->tapeset, tapenum,
                         (void *) &writtenlen, sizeof(writtenlen));

    if (!state->slabAllocatorUsed && stup->tuple)
    {
        FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
        pfree(stup->tuple);
    }
}

static void
readtup_datum(Tuplesortstate *state, SortTuple *stup,
              int tapenum, unsigned int len)
{
    unsigned int tuplen = len - sizeof(unsigned int);

    if (tuplen == 0)
    {
        /* it's NULL */
        stup->datum1 = (Datum) 0;
        stup->isnull1 = true;
        stup->tuple = NULL;
    }
    else if (!state->tuples)
    {
        Assert(tuplen == sizeof(Datum));
        LogicalTapeReadExact(state->tapeset, tapenum,
                             &stup->datum1, tuplen);
        stup->isnull1 = false;
        stup->tuple = NULL;
    }
    else
    {
        void       *raddr = readtup_alloc(state, tuplen);

        LogicalTapeReadExact(state->tapeset, tapenum,
                             raddr, tuplen);
        stup->datum1 = PointerGetDatum(raddr);
        stup->isnull1 = false;
        stup->tuple = raddr;
    }

    if (state->randomAccess)    /* need trailing length word? */
        LogicalTapeReadExact(state->tapeset, tapenum,
                             &tuplen, sizeof(tuplen));
}

/*
 * Convenience routine to free a tuple previously loaded into sort memory
 */
static void
free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
{
    FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
    pfree(stup->tuple);
}
